diff --git a/swh/web/ui/renderers.py b/swh/web/ui/renderers.py index d617bd3d..7aedf34d 100644 --- a/swh/web/ui/renderers.py +++ b/swh/web/ui/renderers.py @@ -1,145 +1,202 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import re import yaml import json from docutils.core import publish_parts from docutils.writers.html4css1 import Writer, HTMLTranslator from inspect import cleandoc from flask import request, Response, render_template from flask import g from swh.web.ui import utils class SWHFilterEnricher(): """Global filter on fields. """ - def filter_by_fields(self, data): + @classmethod + def filter_by_fields(cls, data): """Extract a request parameter 'fields' if it exists to permit the filtering on the data dict's keys. If such field is not provided, returns the data as is. """ fields = request.args.get('fields') if fields: fields = set(fields.split(',')) data = utils.filter_field_keys(data, fields) return data -class SWHMultiResponse(Response, SWHFilterEnricher): +class SWHAddLinkHeaderEnricher: + """Add link header to response. + + Mixin intended to be used for example in SWHMultiResponse + + """ + @classmethod + def add_link_header(cls, rv, options): + """Add Link header in returned value results. + + Args: + rv (dict): with keys: + - 'headers': potential headers with 'link-next' + and 'link-prev' keys + - 'results': containing the result to return + + Returns: + tuple rv, options: + + If link-headers are present, rv is the returned value + present in the 'results' key. Also, options is updated + with headers 'Link' containing the 'link-next' and + 'link-prev' headers. + + Otherwise, rv, options stays the same as the input. + + """ + link_headers = [] + + if 'headers' not in rv: + return rv, options + + rv_headers = rv['headers'] + + if 'link-next' in rv_headers: + link_headers.append('<%s>; rel="next"' % ( + rv_headers['link-next'])) + if 'link-prev' in rv_headers: + link_headers.append('<%s>; rel="previous"' % ( + rv_headers['link-prev'])) + + if link_headers: + link_header_str = ','.join(link_headers) + headers = options.get('headers', {}) + headers.update({ + 'Link': link_header_str + }) + options['headers'] = headers + rv.pop('headers') + return rv['results'], options + + return rv, options + + +class SWHMultiResponse(Response, SWHFilterEnricher, SWHAddLinkHeaderEnricher): """ A Flask Response subclass. Override force_type to transform dict/list responses into callable Flask response objects whose mimetype matches the request's Accept header: HTML template render, YAML dump or default to a JSON dump. """ @classmethod def make_response_from_mimetype(cls, rv, options={}): if not (isinstance(rv, list) or isinstance(rv, dict)): return rv def wants_html(best_match): return best_match == 'text/html' and \ request.accept_mimetypes[best_match] > \ request.accept_mimetypes['application/json'] def wants_yaml(best_match): return best_match == 'application/yaml' and \ request.accept_mimetypes[best_match] > \ request.accept_mimetypes['application/json'] - rv = cls.filter_by_fields(cls, rv) + rv = cls.filter_by_fields(rv) acc_mime = ['application/json', 'application/yaml', 'text/html'] best_match = request.accept_mimetypes.best_match(acc_mime) + rv, options = cls.add_link_header(rv, options) + if wants_html(best_match): data = json.dumps(rv, sort_keys=True, indent=4, separators=(',', ': ')) env = g.get('doc_env', {}) env['response_data'] = data env['request'] = request rv = Response(render_template('apidoc.html', **env), content_type='text/html', **options) elif wants_yaml(best_match): rv = Response( yaml.dump(rv), content_type='application/yaml', **options) else: # jsonify is unhappy with lists in Flask 0.10.1, use json.dumps rv = Response( json.dumps(rv), content_type='application/json', **options) return rv @classmethod def force_type(cls, rv, environ=None): if isinstance(rv, dict) or isinstance(rv, list): rv = cls.make_response_from_mimetype(rv) return super().force_type(rv, environ) def error_response(error_code, error): """Private function to create a custom error response. """ error_opts = {'status': error_code} error_data = {'error': str(error)} return SWHMultiResponse.make_response_from_mimetype(error_data, options=error_opts) def urlize_api_links(content): """Utility function for decorating api links in browsable api.""" return re.sub(r'"(/api/.*|/browse/.*)"', r'"\1"', content) class NoHeaderHTMLTranslator(HTMLTranslator): """ Docutils translator subclass to customize the generation of HTML from reST-formatted docstrings """ def __init__(self, document): super().__init__(document) self.body_prefix = [] self.body_suffix = [] def visit_bullet_list(self, node): self.context.append((self.compact_simple, self.compact_p)) self.compact_p = None self.compact_simple = self.is_compactable(node) self.body.append(self.starttag(node, 'ul', CLASS='docstring')) DOCSTRING_WRITER = Writer() DOCSTRING_WRITER.translator_class = NoHeaderHTMLTranslator def safe_docstring_display(docstring): """ Utility function to htmlize reST-formatted documentation in browsable api. """ docstring = cleandoc(docstring) return publish_parts(docstring, writer=DOCSTRING_WRITER)['html_body'] def revision_id_from_url(url): """Utility function to obtain a revision's ID from its browsing URL.""" return re.sub(r'/browse/revision/([0-9a-f]{40}|[0-9a-f]{64})/.*', r'\1', url) diff --git a/swh/web/ui/tests/test_renderers.py b/swh/web/ui/tests/test_renderers.py index a74e7492..a7bb278a 100644 --- a/swh/web/ui/tests/test_renderers.py +++ b/swh/web/ui/tests/test_renderers.py @@ -1,233 +1,281 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json import unittest import yaml from flask import Response from nose.tools import istest from unittest.mock import patch, MagicMock from swh.web.ui import renderers +class SWHAddLinkHeaderEnricherTest(unittest.TestCase): + @istest + def add_link_header(self): + rv = { + 'headers': {'link-next': 'foo', 'link-prev': 'bar'}, + 'results': [1, 2, 3] + } + options = {} + + # when + _rv, _options = renderers.SWHAddLinkHeaderEnricher.add_link_header( + rv, options) + + self.assertEquals(_rv, [1, 2, 3]) + self.assertEquals(_options, {'headers': { + 'Link': '; rel="next",; rel="previous"', + }}) + + @istest + def add_link_header_nothing_changed(self): + rv = {} + options = {} + + # when + _rv, _options = renderers.SWHAddLinkHeaderEnricher.add_link_header( + rv, options) + + self.assertEquals(_rv, {}) + self.assertEquals(_options, {}) + + @istest + def add_link_header_nothing_changed_2(self): + rv = {'headers': {}} + options = {} + + # when + _rv, _options = renderers.SWHAddLinkHeaderEnricher.add_link_header( + rv, options) + + self.assertEquals(_rv, {'headers': {}}) + self.assertEquals(_options, {}) + + class RendererTestCase(unittest.TestCase): @patch('swh.web.ui.renderers.g') @patch('swh.web.ui.renderers.json') @patch('swh.web.ui.renderers.request') @patch('swh.web.ui.renderers.render_template') @patch('swh.web.ui.renderers.SWHMultiResponse.filter_by_fields') @istest - def swh_multi_response_mimetype_html(self, mock_filter, mock_render, - mock_request, mock_json, mock_g): + def swh_multi_response_mimetype_html(self, mock_filter, + mock_render, mock_request, mock_json, + mock_g): # given - data = {'data': [12, 34], - 'id': 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'} + data = { + 'data': [12, 34], + 'id': 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc' + } mock_g.get.return_value = {'my_key': 'my_display_value'} + # mock_enricher.return_value = (data, {}) mock_filter.return_value = data expected_env = { 'my_key': 'my_display_value', 'response_data': json.dumps(data), 'request': mock_request } def mock_mimetypes(key): mimetypes = { 'text/html': 10, 'application/json': 0.1, 'application/yaml': 0.1 } return mimetypes[key] accept_mimetypes = MagicMock() accept_mimetypes.__getitem__.side_effect = mock_mimetypes accept_mimetypes.best_match = MagicMock(return_value='text/html') mock_request.accept_mimetypes = accept_mimetypes mock_json.dumps.return_value = json.dumps(data) # when rv = renderers.SWHMultiResponse.make_response_from_mimetype(data) # then - mock_filter.assert_called_once_with(renderers.SWHMultiResponse, data) + # mock_enricher.assert_called_once_with(data, {}) + mock_filter.assert_called_once_with(data) mock_render.assert_called_with('apidoc.html', **expected_env) self.assertEqual(rv.status_code, 200) self.assertEqual(rv.mimetype, 'text/html') @patch('swh.web.ui.renderers.g') @patch('swh.web.ui.renderers.yaml') @patch('swh.web.ui.renderers.request') @patch('swh.web.ui.renderers.SWHMultiResponse.filter_by_fields') @istest def swh_multi_response_mimetype_yaml(self, mock_filter, mock_request, mock_yaml, mock_g): # given data = {'data': [12, 34], 'id': 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'} def mock_mimetypes(key): mimetypes = { 'application/yaml': 10, 'application/json': 0.1, 'text/html': 0.1 } return mimetypes[key] accept_mimetypes = MagicMock() accept_mimetypes.__getitem__.side_effect = mock_mimetypes accept_mimetypes.best_match = MagicMock( return_value='application/yaml') mock_request.accept_mimetypes = accept_mimetypes mock_yaml.dump.return_value = yaml.dump(data) mock_filter.return_value = data # when rv = renderers.SWHMultiResponse.make_response_from_mimetype(data) # then - mock_filter.assert_called_once_with(renderers.SWHMultiResponse, data) + mock_filter.assert_called_once_with(data) mock_yaml.dump.assert_called_once_with(data) self.assertEqual(rv.status_code, 200) self.assertEqual(rv.mimetype, 'application/yaml') self.assertEqual(data, yaml.load(rv.data.decode('utf-8'))) @patch('swh.web.ui.renderers.g') @patch('swh.web.ui.renderers.json') @patch('swh.web.ui.renderers.request') @patch('swh.web.ui.renderers.SWHMultiResponse.filter_by_fields') @istest def swh_multi_response_mimetype_json(self, mock_filter, mock_request, mock_json, mock_g): # given data = {'data': [12, 34], 'id': 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'} def mock_mimetypes(key): mimetypes = { 'application/json': 10, 'text/html': 0.1, 'application/yaml': 0.1 } return mimetypes[key] accept_mimetypes = MagicMock() accept_mimetypes.__getitem__.side_effect = mock_mimetypes accept_mimetypes.best_match = MagicMock( return_value='application/json') mock_request.accept_mimetypes = accept_mimetypes mock_json.dumps.return_value = json.dumps(data) mock_filter.return_value = data # when rv = renderers.SWHMultiResponse.make_response_from_mimetype(data) # then - mock_filter.assert_called_once_with(renderers.SWHMultiResponse, data) + mock_filter.assert_called_once_with(data) mock_json.dumps.assert_called_once_with(data) self.assertEqual(rv.status_code, 200) self.assertEqual(rv.mimetype, 'application/json') self.assertEqual(data, json.loads(rv.data.decode('utf-8'))) @patch('swh.web.ui.renderers.request') @istest def swh_multi_response_make_response_not_list_dict(self, mock_request): # given incoming = Response() # when rv = renderers.SWHMultiResponse.make_response_from_mimetype(incoming) # then self.assertEqual(rv, incoming) @patch('swh.web.ui.renderers.request') @istest def swh_filter_renderer_do_nothing(self, mock_request): # given mock_request.args = {} swh_filter_renderer = renderers.SWHFilterEnricher() input_data = {'a': 'some-data'} # when actual_data = swh_filter_renderer.filter_by_fields(input_data) # then self.assertEquals(actual_data, input_data) @patch('swh.web.ui.renderers.utils') @patch('swh.web.ui.renderers.request') @istest def swh_filter_renderer_do_filter(self, mock_request, mock_utils): # given mock_request.args = {'fields': 'a,c'} mock_utils.filter_field_keys.return_value = {'a': 'some-data'} swh_filter_user = renderers.SWHMultiResponse() input_data = {'a': 'some-data', 'b': 'some-other-data'} # when actual_data = swh_filter_user.filter_by_fields(input_data) # then self.assertEquals(actual_data, {'a': 'some-data'}) mock_utils.filter_field_keys.assert_called_once_with(input_data, {'a', 'c'}) @istest def urlize_api_links(self): # update api link with html links content with links content = '{"url": "/api/1/abc/"}' expected_content = '{"url": "/api/1/abc/"}' self.assertEquals(renderers.urlize_api_links(content), expected_content) # update /browse link with html links content with links content = '{"url": "/browse/def/"}' expected_content = '{"url": "' \ '/browse/def/"}' self.assertEquals(renderers.urlize_api_links(content), expected_content) # will do nothing since it's not an api url other_content = '{"url": "/something/api/1/other"}' self.assertEquals(renderers.urlize_api_links(other_content), other_content) @istest def revision_id_from_url(self): url = ('/browse/revision/9ba4bcb645898d562498ea66a0df958ef0e7a68c/' 'prev/9ba4bcb645898d562498ea66a0df958ef0e7aaaa/') expected_id = '9ba4bcb645898d562498ea66a0df958ef0e7a68c' self.assertEqual(renderers.revision_id_from_url(url), expected_id) @istest def safe_docstring_display(self): # update api link with html links content with links docstring = """This is my list header: - Here is item 1, with a continuation line right here - Here is item 2 Here is something that is not part of the list""" expected_docstring = """

This is my list header:

Here is something that is not part of the list

""" self.assertEquals(renderers.safe_docstring_display(docstring), expected_docstring) diff --git a/swh/web/ui/tests/test_utils.py b/swh/web/ui/tests/test_utils.py index 49c13bc3..d7f4f545 100644 --- a/swh/web/ui/tests/test_utils.py +++ b/swh/web/ui/tests/test_utils.py @@ -1,853 +1,879 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import dateutil import unittest from unittest.mock import patch, call from nose.tools import istest, nottest from swh.web.ui import utils class UtilsTestCase(unittest.TestCase): def setUp(self): self.url_map = [dict(rule='/other/', methods=set(['GET', 'POST', 'HEAD']), endpoint='foo'), dict(rule='/some/old/url/', methods=set(['GET', 'POST']), endpoint='blablafn'), dict(rule='/other/old/url/', methods=set(['GET', 'HEAD']), endpoint='bar'), dict(rule='/other', methods=set([]), endpoint=None), dict(rule='/other2', methods=set([]), endpoint=None)] @istest def filter_endpoints_1(self): # when actual_data = utils.filter_endpoints(self.url_map, '/some') # then self.assertEquals(actual_data, { '/some/old/url/': { 'methods': ['GET', 'POST'], 'endpoint': 'blablafn' } }) @istest def filter_endpoints_2(self): # when actual_data = utils.filter_endpoints(self.url_map, '/other', blacklist=['/other2']) # then # rules /other is skipped because its' exactly the prefix url # rules /other2 is skipped because it's blacklisted self.assertEquals(actual_data, { '/other/': { 'methods': ['GET', 'HEAD', 'POST'], 'endpoint': 'foo' }, '/other/old/url/': { 'methods': ['GET', 'HEAD'], 'endpoint': 'bar' } }) @istest def prepare_data_for_view_default_encoding(self): self.maxDiff = None # given inputs = [ { 'data': b'some blah data' }, { 'data': 1, 'data_url': '/api/1/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' }] # when actual_result = utils.prepare_data_for_view(inputs) # then self.assertEquals(actual_result, [ { 'data': 'some blah data', }, { 'data': 1, 'data_url': '/browse/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' } ]) @istest def prepare_data_for_view(self): self.maxDiff = None # given inputs = [ { 'data': b'some blah data' }, { 'data': 1, 'data_url': '/api/1/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' }] # when actual_result = utils.prepare_data_for_view(inputs, encoding='ascii') # then self.assertEquals(actual_result, [ { 'data': 'some blah data', }, { 'data': 1, 'data_url': '/browse/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' } ]) @istest def prepare_data_for_view_ko_cannot_decode(self): self.maxDiff = None # given inputs = { 'data': 'hé dude!'.encode('utf8'), } actual_result = utils.prepare_data_for_view(inputs, encoding='ascii') # then self.assertEquals(actual_result, { 'data': "Cannot decode the data bytes, try and set another " "encoding in the url (e.g. ?encoding=utf8) or " "download directly the " "content's raw data.", }) @istest def filter_field_keys_dict_unknown_keys(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory1', 'file2'}) # then self.assertEqual(actual_res, {}) @istest def filter_field_keys_dict(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory', 'link'}) # then self.assertEqual(actual_res, {'directory': 1, 'link': 3}) @istest def filter_field_keys_list_unknown_keys(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'1': 1, '2': 2, 'link': 3}], {'d'}) # then self.assertEqual(actual_res, [{}, {}]) @istest def filter_field_keys_list(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'dir': 1, 'fil': 2, 'lin': 3}], {'directory', 'dir'}) # then self.assertEqual(actual_res, [{'directory': 1}, {'dir': 1}]) @istest def filter_field_keys_other(self): # given input_set = {1, 2} # when actual_res = utils.filter_field_keys(input_set, {'a', '1'}) # then self.assertEqual(actual_res, input_set) @istest def fmap(self): self.assertEquals([2, 3, None, 4], utils.fmap(lambda x: x+1, [1, 2, None, 3])) self.assertEquals([11, 12, 13], list(utils.fmap(lambda x: x+10, map(lambda x: x, [1, 2, 3])))) self.assertEquals({'a': 2, 'b': 4}, utils.fmap(lambda x: x*2, {'a': 1, 'b': 2})) self.assertEquals(100, utils.fmap(lambda x: x*10, 10)) self.assertEquals({'a': [2, 6], 'b': 4}, utils.fmap(lambda x: x*2, {'a': [1, 3], 'b': 2})) self.assertIsNone(utils.fmap(lambda x: x, None)) @istest def person_to_string(self): self.assertEqual(utils.person_to_string(dict(name='raboof', email='foo@bar')), 'raboof ') @istest def parse_timestamp(self): input_timestamps = [ '2016-01-12', '2016-01-12T09:19:12+0100', 'Today is January 1, 2047 at 8:21:00AM', '1452591542', ] output_dates = [ datetime.datetime(2016, 1, 12, 0, 0, tzinfo=datetime.timezone.utc), datetime.datetime(2016, 1, 12, 9, 19, 12, tzinfo=dateutil.tz.tzoffset(None, 3600)), datetime.datetime(2047, 1, 1, 8, 21, tzinfo=datetime.timezone.utc), datetime.datetime(2016, 1, 12, 9, 39, 2, tzinfo=datetime.timezone.utc), ] for ts, exp_date in zip(input_timestamps, output_dates): self.assertEquals(utils.parse_timestamp(ts), exp_date) @istest def enrich_release_0(self): # when actual_release = utils.enrich_release({}) # then self.assertEqual(actual_release, {}) @patch('swh.web.ui.utils.flask') @istest def enrich_release_1(self, mock_flask): # given mock_flask.url_for.return_value = '/api/1/content/sha1_git:123/' # when actual_release = utils.enrich_release({'target': '123', 'target_type': 'content'}) # then self.assertEqual(actual_release, { 'target': '123', 'target_type': 'content', 'target_url': '/api/1/content/sha1_git:123/' }) mock_flask.url_for.assert_called_once_with('api_content_metadata', q='sha1_git:123') @patch('swh.web.ui.utils.flask') @istest def enrich_release_2(self, mock_flask): # given mock_flask.url_for.return_value = '/api/1/dir/23/' # when actual_release = utils.enrich_release({'target': '23', 'target_type': 'directory'}) # then self.assertEqual(actual_release, { 'target': '23', 'target_type': 'directory', 'target_url': '/api/1/dir/23/' }) mock_flask.url_for.assert_called_once_with('api_directory', q='23') @patch('swh.web.ui.utils.flask') @istest def enrich_release_3(self, mock_flask): # given mock_flask.url_for.return_value = '/api/1/rev/3/' # when actual_release = utils.enrich_release({'target': '3', 'target_type': 'revision'}) # then self.assertEqual(actual_release, { 'target': '3', 'target_type': 'revision', 'target_url': '/api/1/rev/3/' }) mock_flask.url_for.assert_called_once_with('api_revision', sha1_git='3') @patch('swh.web.ui.utils.flask') @istest def enrich_release_4(self, mock_flask): # given mock_flask.url_for.return_value = '/api/1/rev/4/' # when actual_release = utils.enrich_release({'target': '4', 'target_type': 'release'}) # then self.assertEqual(actual_release, { 'target': '4', 'target_type': 'release', 'target_url': '/api/1/rev/4/' }) mock_flask.url_for.assert_called_once_with('api_release', sha1_git='4') @patch('swh.web.ui.utils.flask') @istest def enrich_directory_no_type(self, mock_flask): # when/then self.assertEqual(utils.enrich_directory({'id': 'dir-id'}), {'id': 'dir-id'}) # given mock_flask.url_for.return_value = '/api/content/sha1_git:123/' # when actual_directory = utils.enrich_directory({ 'id': 'dir-id', 'type': 'file', 'target': '123', }) # then self.assertEqual(actual_directory, { 'id': 'dir-id', 'type': 'file', 'target': '123', 'target_url': '/api/content/sha1_git:123/', }) mock_flask.url_for.assert_called_once_with('api_content_metadata', q='sha1_git:123') @patch('swh.web.ui.utils.flask') @istest def enrich_directory_with_context_and_type_file(self, mock_flask): # given mock_flask.url_for.return_value = '/api/content/sha1_git:123/' # when actual_directory = utils.enrich_directory({ 'id': 'dir-id', 'type': 'file', 'name': 'hy', 'target': '789', }, context_url='/api/revision/revsha1/directory/prefix/path/') # then self.assertEqual(actual_directory, { 'id': 'dir-id', 'type': 'file', 'name': 'hy', 'target': '789', 'target_url': '/api/content/sha1_git:123/', 'file_url': '/api/revision/revsha1/directory' '/prefix/path/hy/' }) mock_flask.url_for.assert_called_once_with('api_content_metadata', q='sha1_git:789') @patch('swh.web.ui.utils.flask') @istest def enrich_directory_with_context_and_type_dir(self, mock_flask): # given mock_flask.url_for.return_value = '/api/directory/456/' # when actual_directory = utils.enrich_directory({ 'id': 'dir-id', 'type': 'dir', 'name': 'emacs-42', 'target_type': 'file', 'target': '456', }, context_url='/api/revision/origin/2/directory/some/prefix/path/') # then self.assertEqual(actual_directory, { 'id': 'dir-id', 'type': 'dir', 'target_type': 'file', 'name': 'emacs-42', 'target': '456', 'target_url': '/api/directory/456/', 'dir_url': '/api/revision/origin/2/directory' '/some/prefix/path/emacs-42/' }) mock_flask.url_for.assert_called_once_with('api_directory', sha1_git='456') @istest def enrich_content_without_hashes(self): # when/then self.assertEqual(utils.enrich_content({'id': '123'}), {'id': '123'}) @patch('swh.web.ui.utils.flask') @istest def enrich_content_with_hashes(self, mock_flask): for h in ['sha1', 'sha256', 'sha1_git']: # given mock_flask.url_for.side_effect = [ '/api/content/%s:123/raw/' % h, '/api/filetype/%s:123/' % h, '/api/language/%s:123/' % h, '/api/license/%s:123/' % h, ] # when enriched_content = utils.enrich_content( { 'id': '123', h: 'blahblah' } ) # then self.assertEqual( enriched_content, { 'id': '123', h: 'blahblah', 'data_url': '/api/content/%s:123/raw/' % h, 'filetype_url': '/api/filetype/%s:123/' % h, 'language_url': '/api/language/%s:123/' % h, 'license_url': '/api/license/%s:123/' % h, } ) mock_flask.url_for.assert_has_calls([ call('api_content_raw', q='%s:blahblah' % h), call('api_content_filetype', q='%s:blahblah' % h), call('api_content_language', q='%s:blahblah' % h), call('api_content_license', q='%s:blahblah' % h), ]) mock_flask.reset() @istest def enrich_entity_identity(self): # when/then self.assertEqual(utils.enrich_content({'id': '123'}), {'id': '123'}) @patch('swh.web.ui.utils.flask') @istest def enrich_entity_with_sha1(self, mock_flask): # given def url_for_test(fn, **entity): return '/api/entity/' + entity['uuid'] + '/' mock_flask.url_for.side_effect = url_for_test # when actual_entity = utils.enrich_entity({ 'uuid': 'uuid-1', 'parent': 'uuid-parent', 'name': 'something' }) # then self.assertEqual(actual_entity, { 'uuid': 'uuid-1', 'uuid_url': '/api/entity/uuid-1/', 'parent': 'uuid-parent', 'parent_url': '/api/entity/uuid-parent/', 'name': 'something', }) mock_flask.url_for.assert_has_calls([call('api_entity_by_uuid', uuid='uuid-1'), call('api_entity_by_uuid', uuid='uuid-parent')]) @nottest def _url_for_context_test(self, fn, **data): if fn == 'api_revision': if 'context' in data and data['context'] is not None: return '/api/revision/%s/prev/%s/' % (data['sha1_git'], data['context']) # noqa else: return '/api/revision/%s/' % data['sha1_git'] elif fn == 'api_revision_log': if 'prev_sha1s' in data: return '/api/revision/%s/prev/%s/log/' % (data['sha1_git'], data['prev_sha1s']) # noqa else: return '/api/revision/%s/log/' % data['sha1_git'] @patch('swh.web.ui.utils.flask') @istest def enrich_revision_without_children_or_parent(self, mock_flask): # given def url_for_test(fn, **data): if fn == 'api_revision': return '/api/revision/' + data['sha1_git'] + '/' elif fn == 'api_revision_log': return '/api/revision/' + data['sha1_git'] + '/log/' elif fn == 'api_directory': return '/api/directory/' + data['sha1_git'] + '/' elif fn == 'api_person': return '/api/person/' + data['person_id'] + '/' mock_flask.url_for.side_effect = url_for_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'directory': '123', 'author': {'id': '1'}, 'committer': {'id': '2'}, }) expected_revision = { 'id': 'rev-id', 'directory': '123', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'directory_url': '/api/directory/123/', 'author': {'id': '1'}, 'author_url': '/api/person/1/', 'committer': {'id': '2'}, 'committer_url': '/api/person/2/' } # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_person', person_id='1'), call('api_person', person_id='2'), call('api_directory', sha1_git='123')]) @patch('swh.web.ui.utils.flask') @istest def enrich_revision_with_children_and_parent_no_dir(self, mock_flask): # given mock_flask.url_for.side_effect = self._url_for_context_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'parents': ['123'], 'children': ['456'], }, context='prev-rev') expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': '/api/revision/rev-id/prev/prev-rev/log/', 'parents': ['123'], 'parent_urls': ['/api/revision/123/prev/prev-rev/rev-id/'], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='prev-rev'), call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id', prev_sha1s='prev-rev'), call('api_revision', sha1_git='123', context='prev-rev/rev-id'), call('api_revision', sha1_git='456')]) @patch('swh.web.ui.utils.flask') @istest def enrich_revision_no_context(self, mock_flask): # given mock_flask.url_for.side_effect = self._url_for_context_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'parents': ['123'], 'children': ['456'], }) expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'parents': ['123'], 'parent_urls': ['/api/revision/123/prev/rev-id/'], 'children': ['456'], 'children_urls': ['/api/revision/456/'] } # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision', sha1_git='123', context='rev-id'), call('api_revision', sha1_git='456')]) @patch('swh.web.ui.utils.flask') @istest def enrich_revision_context_empty_prev_list(self, mock_flask): # given mock_flask.url_for.side_effect = self._url_for_context_test # when expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev-rev/log/'), 'parents': ['123'], 'parent_urls': ['/api/revision/123/prev/prev-rev/rev-id/'], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'parents': ['123'], 'children': ['456']}, context='prev-rev') # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='prev-rev'), call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id', prev_sha1s='prev-rev'), call('api_revision', sha1_git='123', context='prev-rev/rev-id'), call('api_revision', sha1_git='456')]) @patch('swh.web.ui.utils.flask') @istest def enrich_revision_context_some_prev_list(self, mock_flask): # given mock_flask.url_for.side_effect = self._url_for_context_test # when expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev1-rev/prev0-rev/log/'), 'parents': ['123'], 'parent_urls': ['/api/revision/123/prev/' 'prev1-rev/prev0-rev/rev-id/'], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev0-rev/prev/prev1-rev/'], } actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'parents': ['123'], 'children': ['456']}, context='prev1-rev/prev0-rev') # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='prev0-rev', context='prev1-rev'), call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id', prev_sha1s='prev1-rev/prev0-rev'), call('api_revision', sha1_git='123', context='prev1-rev/prev0-rev/rev-id'), call('api_revision', sha1_git='456')]) @nottest def _url_for_rev_message_test(self, fn, **data): if fn == 'api_revision': if 'context' in data and data['context'] is not None: return '/api/revision/%s/prev/%s/' % (data['sha1_git'], data['context']) # noqa else: return '/api/revision/%s/' % data['sha1_git'] elif fn == 'api_revision_log': if 'prev_sha1s' in data and data['prev_sha1s'] is not None: return '/api/revision/%s/prev/%s/log/' % (data['sha1_git'], data['prev_sha1s']) # noqa else: return '/api/revision/%s/log/' % data['sha1_git'] elif fn == 'api_revision_raw_message': return '/api/revision/' + data['sha1_git'] + '/raw/' else: return '/api/revision/' + data['sha1_git_root'] + '/history/' + data['sha1_git'] + '/' # noqa @patch('swh.web.ui.utils.flask') @istest def enrich_revision_with_no_message(self, mock_flask): # given mock_flask.url_for.side_effect = self._url_for_rev_message_test # when expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev-rev/log/'), 'message': None, 'parents': ['123'], 'parent_urls': ['/api/revision/123/prev/prev-rev/rev-id/'], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'message': None, 'parents': ['123'], 'children': ['456'], }, context='prev-rev') # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='prev-rev'), call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id', prev_sha1s='prev-rev'), call('api_revision', sha1_git='123', context='prev-rev/rev-id'), call('api_revision', sha1_git='456')]) @patch('swh.web.ui.utils.flask') @istest def enrich_revision_with_invalid_message(self, mock_flask): # given mock_flask.url_for.side_effect = self._url_for_rev_message_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'message': None, 'message_decoding_failed': True, 'parents': ['123'], 'children': ['456'], }, context='prev-rev') expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev-rev/log/'), 'message': None, 'message_decoding_failed': True, 'message_url': '/api/revision/rev-id/raw/', 'parents': ['123'], 'parent_urls': ['/api/revision/123/prev/prev-rev/rev-id/'], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } # then self.assertEqual(actual_revision, expected_revision) mock_flask.url_for.assert_has_calls( [call('api_revision', sha1_git='prev-rev'), call('api_revision', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id'), call('api_revision_log', sha1_git='rev-id', prev_sha1s='prev-rev'), call('api_revision', sha1_git='123', context='prev-rev/rev-id'), call('api_revision', sha1_git='456')]) + + @patch('swh.web.ui.utils.flask') + def next_page(self, mock_flask): + mock_flask.url_for.return_value = '/some/api/url/' + + # when + actual_url = utils.next_page('some-function-name', q='barfoo', page=10) + + # then + self.assertEquals(actual_url, '/some/api/url/?page=11') + + mock_flask.url_for.assert_called_once_with('some-function-name', + q='barfoo') + + @patch('swh.web.ui.utils.flask') + def prev_page(self, mock_flask): + mock_flask.url_for.return_value = '/some/api/url/' + + # when + actual_url = utils.prev_page('some-function-name', q='barfoo', page=10) + + # then + self.assertEquals(actual_url, '/some/api/url/?page=9') + + mock_flask.url_for.assert_called_once_with('some-function-name', + q='barfoo') diff --git a/swh/web/ui/tests/views/test_api.py b/swh/web/ui/tests/views/test_api.py index e7efd98b..282109c1 100644 --- a/swh/web/ui/tests/views/test_api.py +++ b/swh/web/ui/tests/views/test_api.py @@ -1,2241 +1,2247 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json import unittest import yaml from nose.tools import istest from unittest.mock import patch, MagicMock from swh.web.ui.tests import test_app from swh.web.ui import exc from swh.web.ui.views import api from swh.web.ui.exc import NotFoundExc, BadInputExc from swh.storage.exc import StorageDBError, StorageAPIError class ApiTestCase(test_app.SWHApiTestCase): def setUp(self): self.origin_visit1 = { 'date': 1104616800.0, 'origin': 10, 'visit': 100, 'metadata': None, 'status': 'full', } self.origin1 = { 'id': 1234, 'lister': 'uuid-lister-0', 'project': 'uuid-project-0', 'url': 'ftp://some/url/to/origin/0', 'type': 'ftp' } @istest def generic_api_lookup_nothing_is_found(self): # given def test_generic_lookup_fn(sha1, another_unused_arg): assert another_unused_arg == 'unused arg' assert sha1 == 'sha1' return None # when with self.assertRaises(NotFoundExc) as cm: api._api_lookup('sha1', test_generic_lookup_fn, 'This will be raised because None is returned.', lambda x: x, 'unused arg') self.assertIn('This will be raised because None is returned.', cm.exception.args[0]) @istest def generic_api_map_are_enriched_and_transformed_to_list(self): # given def test_generic_lookup_fn_1(criteria0, param0, param1): assert criteria0 == 'something' return map(lambda x: x + 1, [1, 2, 3]) # when actual_result = api._api_lookup( 'something', test_generic_lookup_fn_1, 'This is not the error message you are looking for. Move along.', lambda x: x * 2, 'some param 0', 'some param 1') self.assertEqual(actual_result, [4, 6, 8]) @istest def generic_api_list_are_enriched_too(self): # given def test_generic_lookup_fn_2(crit): assert crit == 'something' return ['a', 'b', 'c'] # when actual_result = api._api_lookup( 'something', test_generic_lookup_fn_2, 'Not the error message you are looking for, it is. ' 'Along, you move!', lambda x: ''. join(['=', x, '='])) self.assertEqual(actual_result, ['=a=', '=b=', '=c=']) @istest def generic_api_generator_are_enriched_and_returned_as_list(self): # given def test_generic_lookup_fn_3(crit): assert crit == 'crit' return (i for i in [4, 5, 6]) # when actual_result = api._api_lookup( 'crit', test_generic_lookup_fn_3, 'Move!', lambda x: x - 1) self.assertEqual(actual_result, [3, 4, 5]) @istest def generic_api_simple_data_are_enriched_and_returned_too(self): # given def test_generic_lookup_fn_4(crit): assert crit == '123' return {'a': 10} def test_enrich_data(x): x['a'] = x['a'] * 10 return x # when actual_result = api._api_lookup( '123', test_generic_lookup_fn_4, 'Nothing to do', test_enrich_data) self.assertEqual(actual_result, {'a': 100}) @patch('swh.web.ui.views.api.service') @istest def api_content_filetype(self, mock_service): stub_filetype = { 'mimetype': 'application/xml', 'encoding': 'ascii', 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', } mock_service.lookup_content_filetype.return_value = stub_filetype # when rv = self.app.get( '/api/1/filetype/' 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'mimetype': 'application/xml', 'encoding': 'ascii', 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'content_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', }) mock_service.lookup_content_filetype.assert_called_once_with( 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f') @patch('swh.web.ui.views.api.service') @istest def api_content_filetype_sha_not_found(self, mock_service): # given mock_service.lookup_content_filetype.return_value = None # when rv = self.app.get( '/api/1/filetype/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No filetype information found for content ' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03.' }) mock_service.lookup_content_filetype.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_language(self, mock_service): stub_language = { 'lang': 'lisp', 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', } mock_service.lookup_content_language.return_value = stub_language # when rv = self.app.get( '/api/1/language/' 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'lang': 'lisp', 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'content_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', }) mock_service.lookup_content_language.assert_called_once_with( 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f') @patch('swh.web.ui.views.api.service') @istest def api_content_language_sha_not_found(self, mock_service): # given mock_service.lookup_content_language.return_value = None # when rv = self.app.get( '/api/1/language/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No language information found for content ' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03.' }) mock_service.lookup_content_language.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_fulltext_search(self, mock_service): stub_ctag = [{ 'sha1': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'foobar', 'kind': 'Haskell', 'line': 10, }] mock_service.lookup_expression.return_value = stub_ctag # when - rv = self.app.get('/api/1/symbol/foo/') + rv = self.app.get('/api/1/symbol/foo/?page=2') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, [{ 'sha1': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'foobar', 'kind': 'Haskell', 'line': 10, 'data_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/', 'license_url': '/api/1/license/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'language_url': '/api/1/language/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'filetype_url': '/api/1/filetype/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', }]) + actual_headers = dict(rv.headers) + self.assertEquals(actual_headers['Link'], + '; rel="next",' + '; rel="previous"') - mock_service.lookup_expression.assert_called_once_with('foo', 1) + mock_service.lookup_expression.assert_called_once_with('foo', 2) @patch('swh.web.ui.views.api.service') @istest def api_fulltext_search_not_found(self, mock_service): # given mock_service.lookup_expression.return_value = [] # when rv = self.app.get('/api/1/symbol/bar/?page=2') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No indexed raw content match expression \'bar\'.' }) + actual_headers = dict(rv.headers) + self.assertFalse('Link' in actual_headers) mock_service.lookup_expression.assert_called_once_with('bar', 2) @patch('swh.web.ui.views.api.service') @istest def api_content_license(self, mock_service): stub_license = { 'licenses': ['No_license_found', 'Apache-2.0'], 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'tool_name': 'nomos', } mock_service.lookup_content_license.return_value = stub_license # when rv = self.app.get( '/api/1/license/' 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'licenses': ['No_license_found', 'Apache-2.0'], 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'tool_name': 'nomos', 'content_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', }) mock_service.lookup_content_license.assert_called_once_with( 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f') @patch('swh.web.ui.views.api.service') @istest def api_content_license_sha_not_found(self, mock_service): # given mock_service.lookup_content_license.return_value = None # when rv = self.app.get( '/api/1/license/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No license information found for content ' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03.' }) mock_service.lookup_content_license.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_provenance(self, mock_service): stub_provenances = [{ 'origin': 1, 'visit': 2, 'revision': 'b04caf10e9535160d90e874b45aa426de762f19f', 'content': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html' }] mock_service.lookup_content_provenance.return_value = stub_provenances # when rv = self.app.get( '/api/1/provenance/' 'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, [{ 'origin': 1, 'visit': 2, 'origin_url': '/api/1/origin/1/', 'origin_visits_url': '/api/1/origin/1/visits/', 'origin_visit_url': '/api/1/origin/1/visits/2/', 'revision': 'b04caf10e9535160d90e874b45aa426de762f19f', 'revision_url': '/api/1/revision/' 'b04caf10e9535160d90e874b45aa426de762f19f/', 'content': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'content_url': '/api/1/content/' 'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html' }]) mock_service.lookup_content_provenance.assert_called_once_with( 'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_provenance_sha_not_found(self, mock_service): # given mock_service.lookup_content_provenance.return_value = None # when rv = self.app.get( '/api/1/provenance/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6' '6c5b00a6d03 not found.' }) mock_service.lookup_content_provenance.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_metadata(self, mock_service): # given mock_service.lookup_content.return_value = { 'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882', 'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560' 'cde9b067a4f', 'length': 17, 'status': 'visible' } # when rv = self.app.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'data_url': '/api/1/content/' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/', 'filetype_url': '/api/1/filetype/' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'language_url': '/api/1/language/' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'license_url': '/api/1/license/' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882', 'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560c' 'de9b067a4f', 'length': 17, 'status': 'visible' }) mock_service.lookup_content.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_not_found_as_json(self, mock_service): # given mock_service.lookup_content.return_value = None mock_service.lookup_content_provenance = MagicMock() # when rv = self.app.get( '/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79' '68b3be4735637006560c not found.' }) mock_service.lookup_content.assert_called_once_with( 'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c') mock_service.lookup_content_provenance.called = False @patch('swh.web.ui.views.api.service') @istest def api_content_not_found_as_yaml(self, mock_service): # given mock_service.lookup_content.return_value = None mock_service.lookup_content_provenance = MagicMock() # when rv = self.app.get( '/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c/', headers={'accept': 'application/yaml'}) self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/yaml') response_data = yaml.load(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79' '68b3be4735637006560c not found.' }) mock_service.lookup_content.assert_called_once_with( 'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c') mock_service.lookup_content_provenance.called = False @patch('swh.web.ui.views.api.service') @istest def api_content_raw_ko_not_found(self, mock_service): # given mock_service.lookup_content_raw.return_value = None # when rv = self.app.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' '/raw/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6' '6c5b00a6d03 not found.' }) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_raw(self, mock_service): # given stub_content = {'data': b'some content data'} mock_service.lookup_content_raw.return_value = stub_content # when rv = self.app.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' '/raw/', headers={'Content-type': 'application/octet-stream', 'Content-disposition': 'attachment'}) self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/octet-stream') self.assertEquals(rv.data, stub_content['data']) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_search(self, mock_service): # given mock_service.search_hash.return_value = {'found': True} expected_result = { 'search_stats': {'nbfiles': 1, 'pct': 100}, 'search_res': [{'filename': None, 'sha1': 'sha1:blah', 'found': True}] } # when rv = self.app.get('/api/1/content/search/sha1:blah/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_result) mock_service.search_hash.assert_called_once_with('sha1:blah') @patch('swh.web.ui.views.api.service') @istest def api_search_as_yaml(self, mock_service): # given mock_service.search_hash.return_value = {'found': True} expected_result = { 'search_stats': {'nbfiles': 1, 'pct': 100}, 'search_res': [{'filename': None, 'sha1': 'sha1:halb', 'found': True}] } # when rv = self.app.get('/api/1/content/search/sha1:halb/', headers={'Accept': 'application/yaml'}) self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/yaml') response_data = yaml.load(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_result) mock_service.search_hash.assert_called_once_with('sha1:halb') @patch('swh.web.ui.views.api.service') @istest def api_search_not_found(self, mock_service): # given mock_service.search_hash.return_value = {'found': False} expected_result = { 'search_stats': {'nbfiles': 1, 'pct': 0}, 'search_res': [{'filename': None, 'sha1': 'sha1:halb', 'found': False}] } # when rv = self.app.get('/api/1/content/search/sha1:halb/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_result) mock_service.search_hash.assert_called_once_with('sha1:halb') @patch('swh.web.ui.views.api.service') @istest def api_1_stat_counters_raise_error(self, mock_service): # given mock_service.stat_counters.side_effect = ValueError( 'voluntary error to check the bad request middleware.') # when rv = self.app.get('/api/1/stat/counters/') # then self.assertEquals(rv.status_code, 400) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'voluntary error to check the bad request middleware.'}) @patch('swh.web.ui.views.api.service') @istest def api_1_stat_counters_raise_swh_storage_error_db(self, mock_service): # given mock_service.stat_counters.side_effect = StorageDBError( 'SWH Storage exploded! Will be back online shortly!') # when rv = self.app.get('/api/1/stat/counters/') # then self.assertEquals(rv.status_code, 503) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'An unexpected error occurred in the backend: ' 'SWH Storage exploded! Will be back online shortly!'}) @patch('swh.web.ui.views.api.service') @istest def api_1_stat_counters_raise_swh_storage_error_api(self, mock_service): # given mock_service.stat_counters.side_effect = StorageAPIError( 'SWH Storage API dropped dead! Will resurrect from its ashes asap!' ) # when rv = self.app.get('/api/1/stat/counters/') # then self.assertEquals(rv.status_code, 503) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'An unexpected error occurred in the api backend: ' 'SWH Storage API dropped dead! Will resurrect from its ashes asap!' }) @patch('swh.web.ui.views.api.service') @istest def api_1_stat_counters(self, mock_service): # given stub_stats = { "content": 1770830, "directory": 211683, "directory_entry_dir": 209167, "directory_entry_file": 1807094, "directory_entry_rev": 0, "entity": 0, "entity_history": 0, "occurrence": 0, "occurrence_history": 19600, "origin": 1096, "person": 0, "release": 8584, "revision": 7792, "revision_history": 0, "skipped_content": 0 } mock_service.stat_counters.return_value = stub_stats # when rv = self.app.get('/api/1/stat/counters/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_stats) mock_service.stat_counters.assert_called_once_with() @patch('swh.web.ui.views.api.service') @istest def api_1_lookup_origin_visits_raise_error(self, mock_service): # given mock_service.lookup_origin_visits.side_effect = ValueError( 'voluntary error to check the bad request middleware.') # when rv = self.app.get('/api/1/origin/2/visits/') # then self.assertEquals(rv.status_code, 400) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'voluntary error to check the bad request middleware.'}) @patch('swh.web.ui.views.api.service') @istest def api_1_lookup_origin_visits_raise_swh_storage_error_db( self, mock_service): # given mock_service.lookup_origin_visits.side_effect = StorageDBError( 'SWH Storage exploded! Will be back online shortly!') # when rv = self.app.get('/api/1/origin/2/visits/') # then self.assertEquals(rv.status_code, 503) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'An unexpected error occurred in the backend: ' 'SWH Storage exploded! Will be back online shortly!'}) @patch('swh.web.ui.views.api.service') @istest def api_1_lookup_origin_visits_raise_swh_storage_error_api( self, mock_service): # given mock_service.lookup_origin_visits.side_effect = StorageAPIError( 'SWH Storage API dropped dead! Will resurrect from its ashes asap!' ) # when rv = self.app.get('/api/1/origin/2/visits/') # then self.assertEquals(rv.status_code, 503) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'An unexpected error occurred in the api backend: ' 'SWH Storage API dropped dead! Will resurrect from its ashes asap!' }) @patch('swh.web.ui.views.api.service') @istest def api_1_lookup_origin_visits(self, mock_service): # given stub_visits = [ { 'date': 1104616800.0, 'origin': 1, 'visit': 1 }, { 'date': 1293919200.0, 'origin': 1, 'visit': 2 }, { 'date': 1420149600.0, 'origin': 1, 'visit': 3 } ] mock_service.lookup_origin_visits.return_value = stub_visits # when rv = self.app.get('/api/1/origin/2/visits/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, [ { 'date': 1104616800.0, 'origin': 1, 'visit': 1, 'origin_visit_url': '/api/1/origin/1/visits/1/', }, { 'date': 1293919200.0, 'origin': 1, 'visit': 2, 'origin_visit_url': '/api/1/origin/1/visits/2/', }, { 'date': 1420149600.0, 'origin': 1, 'visit': 3, 'origin_visit_url': '/api/1/origin/1/visits/3/', } ]) mock_service.lookup_origin_visits.assert_called_once_with(2) @patch('swh.web.ui.views.api.service') @istest def api_1_lookup_origin_visit(self, mock_service): # given origin_visit = self.origin_visit1.copy() origin_visit.update({ 'occurrences': { 'master': { 'target_type': 'revision', 'target': 'revision-id', } } }) mock_service.lookup_origin_visit.return_value = origin_visit expected_origin_visit = self.origin_visit1.copy() expected_origin_visit.update({ 'origin_url': '/api/1/origin/10/', 'occurrences': { 'master': { 'target_type': 'revision', 'target': 'revision-id', 'target_url': '/api/1/revision/revision-id/' } } }) # when rv = self.app.get('/api/1/origin/10/visits/100/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_origin_visit) mock_service.lookup_origin_visit.assert_called_once_with(10, 100) @patch('swh.web.ui.views.api.service') @istest def api_1_lookup_origin_visit_not_found(self, mock_service): # given mock_service.lookup_origin_visit.return_value = None # when rv = self.app.get('/api/1/origin/1/visits/1000/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No visit 1000 for origin 1 found' }) mock_service.lookup_origin_visit.assert_called_once_with(1, 1000) @patch('swh.web.ui.views.api.service') @istest def api_origin_by_id(self, mock_service): # given mock_service.lookup_origin.return_value = self.origin1 expected_origin = self.origin1.copy() expected_origin.update({ 'origin_visits_url': '/api/1/origin/1234/visits/' }) # when rv = self.app.get('/api/1/origin/1234/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_origin) mock_service.lookup_origin.assert_called_with({'id': 1234}) @patch('swh.web.ui.views.api.service') @istest def api_origin_by_type_url(self, mock_service): # given stub_origin = self.origin1.copy() stub_origin.update({ 'id': 987 }) mock_service.lookup_origin.return_value = stub_origin expected_origin = stub_origin.copy() expected_origin.update({ 'origin_visits_url': '/api/1/origin/987/visits/' }) # when rv = self.app.get('/api/1/origin/ftp/url/ftp://some/url/to/origin/0/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_origin) mock_service.lookup_origin.assert_called_with( {'url': 'ftp://some/url/to/origin/0', 'type': 'ftp'}) @patch('swh.web.ui.views.api.service') @istest def api_origin_not_found(self, mock_service): # given mock_service.lookup_origin.return_value = None # when rv = self.app.get('/api/1/origin/4321/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Origin with id 4321 not found.' }) mock_service.lookup_origin.assert_called_with({'id': 4321}) @patch('swh.web.ui.views.api.service') @istest def api_release(self, mock_service): # given stub_release = { 'id': 'release-0', 'target_type': 'revision', 'target': 'revision-sha1', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } expected_release = { 'id': 'release-0', 'target_type': 'revision', 'target': 'revision-sha1', 'target_url': '/api/1/revision/revision-sha1/', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } mock_service.lookup_release.return_value = stub_release # when rv = self.app.get('/api/1/release/release-0/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_release) mock_service.lookup_release.assert_called_once_with('release-0') @patch('swh.web.ui.views.api.service') @istest def api_release_target_type_not_a_revision(self, mock_service): # given stub_release = { 'id': 'release-0', 'target_type': 'other-stuff', 'target': 'other-stuff-checksum', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } expected_release = { 'id': 'release-0', 'target_type': 'other-stuff', 'target': 'other-stuff-checksum', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } mock_service.lookup_release.return_value = stub_release # when rv = self.app.get('/api/1/release/release-0/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_release) mock_service.lookup_release.assert_called_once_with('release-0') @patch('swh.web.ui.views.api.service') @istest def api_release_not_found(self, mock_service): # given mock_service.lookup_release.return_value = None # when rv = self.app.get('/api/1/release/release-0/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Release with sha1_git release-0 not found.' }) @patch('swh.web.ui.views.api.service') @istest def api_revision(self, mock_service): # given stub_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['8734ef7e7c357ce2af928115c6c6a42b7e2a44e7'], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, } mock_service.lookup_revision.return_value = stub_revision expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233e' 'ff7371d5/log/', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6' 'a42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [ '8734ef7e7c357ce2af928115c6c6a42b7e2a44e7' ], 'parent_urls': [ '/api/1/revision/8734ef7e7c357ce2af928115c6c6a42b7e2a44e7' '/prev/18d8be353ed3480476f032475e7c233eff7371d5/' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, } # when rv = self.app.get('/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_revision) mock_service.lookup_revision.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.views.api.service') @istest def api_revision_not_found(self, mock_service): # given mock_service.lookup_revision.return_value = None # when rv = self.app.get('/api/1/revision/revision-0/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Revision with sha1_git revision-0 not found.'}) @patch('swh.web.ui.views.api.service') @istest def api_revision_raw_ok(self, mock_service): # given stub_revision = {'message': 'synthetic revision message'} mock_service.lookup_revision_message.return_value = stub_revision # when rv = self.app.get('/api/1/revision/18d8be353ed3480476f032475e7c2' '33eff7371d5/raw/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/octet-stream') self.assertEquals(rv.data, b'synthetic revision message') mock_service.lookup_revision_message.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.views.api.service') @istest def api_revision_raw_ok_no_msg(self, mock_service): # given mock_service.lookup_revision_message.side_effect = NotFoundExc( 'No message for revision') # when rv = self.app.get('/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/raw/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No message for revision'}) self.assertEquals mock_service.lookup_revision_message.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.views.api.service') @istest def api_revision_raw_ko_no_rev(self, mock_service): # given mock_service.lookup_revision_message.side_effect = NotFoundExc( 'No revision found') # when rv = self.app.get('/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/raw/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No revision found'}) mock_service.lookup_revision_message.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.views.api.service') @istest def api_revision_with_origin_not_found(self, mock_service): mock_service.lookup_revision_by.return_value = None rv = self.app.get('/api/1/revision/origin/123/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertIn('Revision with (origin_id: 123', response_data['error']) self.assertIn('not found', response_data['error']) mock_service.lookup_revision_by.assert_called_once_with( 123, 'refs/heads/master', None) @patch('swh.web.ui.views.api.service') @istest def api_revision_with_origin(self, mock_service): mock_revision = { 'id': '32', 'directory': '21', 'message': 'message 1', 'type': 'deb', } expected_revision = { 'id': '32', 'url': '/api/1/revision/32/', 'history_url': '/api/1/revision/32/log/', 'directory': '21', 'directory_url': '/api/1/directory/21/', 'message': 'message 1', 'type': 'deb', } mock_service.lookup_revision_by.return_value = mock_revision rv = self.app.get('/api/1/revision/origin/1/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( 1, 'refs/heads/master', None) @patch('swh.web.ui.views.api.service') @istest def api_revision_with_origin_and_branch_name(self, mock_service): mock_revision = { 'id': '12', 'directory': '23', 'message': 'message 2', 'type': 'tar', } mock_service.lookup_revision_by.return_value = mock_revision expected_revision = { 'id': '12', 'url': '/api/1/revision/12/', 'history_url': '/api/1/revision/12/log/', 'directory': '23', 'directory_url': '/api/1/directory/23/', 'message': 'message 2', 'type': 'tar', } rv = self.app.get('/api/1/revision/origin/1/branch/refs/origin/dev/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( 1, 'refs/origin/dev', None) @patch('swh.web.ui.views.api.service') @patch('swh.web.ui.views.api.utils') @istest def api_revision_with_origin_and_branch_name_and_timestamp(self, mock_utils, mock_service): mock_revision = { 'id': '123', 'directory': '456', 'message': 'message 3', 'type': 'tar', } mock_service.lookup_revision_by.return_value = mock_revision expected_revision = { 'id': '123', 'url': '/api/1/revision/123/', 'history_url': '/api/1/revision/123/log/', 'directory': '456', 'directory_url': '/api/1/directory/456/', 'message': 'message 3', 'type': 'tar', } mock_utils.parse_timestamp.return_value = 'parsed-date' mock_utils.enrich_revision.return_value = expected_revision rv = self.app.get('/api/1/revision' '/origin/1' '/branch/refs/origin/dev' '/ts/1452591542/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( 1, 'refs/origin/dev', 'parsed-date') mock_utils.parse_timestamp.assert_called_once_with('1452591542') mock_utils.enrich_revision.assert_called_once_with( mock_revision) @patch('swh.web.ui.views.api.service') @patch('swh.web.ui.views.api.utils') @istest def api_revision_with_origin_and_branch_name_and_timestamp_with_escapes( self, mock_utils, mock_service): mock_revision = { 'id': '999', } mock_service.lookup_revision_by.return_value = mock_revision expected_revision = { 'id': '999', 'url': '/api/1/revision/999/', 'history_url': '/api/1/revision/999/log/', } mock_utils.parse_timestamp.return_value = 'parsed-date' mock_utils.enrich_revision.return_value = expected_revision rv = self.app.get('/api/1/revision' '/origin/1' '/branch/refs%2Forigin%2Fdev' '/ts/Today%20is%20' 'January%201,%202047%20at%208:21:00AM/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( 1, 'refs/origin/dev', 'parsed-date') mock_utils.parse_timestamp.assert_called_once_with( 'Today is January 1, 2047 at 8:21:00AM') mock_utils.enrich_revision.assert_called_once_with( mock_revision) @patch('swh.web.ui.views.api.service') @istest def revision_directory_by_ko_raise(self, mock_service): # given mock_service.lookup_directory_through_revision.side_effect = NotFoundExc('not') # noqa # when with self.assertRaises(NotFoundExc): api._revision_directory_by( {'sha1_git': 'id'}, None, '/api/1/revision/sha1/directory/') # then mock_service.lookup_directory_through_revision.assert_called_once_with( {'sha1_git': 'id'}, None, limit=100, with_data=False) @patch('swh.web.ui.views.api.service') @istest def revision_directory_by_type_dir(self, mock_service): # given mock_service.lookup_directory_through_revision.return_value = ( 'rev-id', { 'type': 'dir', 'revision': 'rev-id', 'path': 'some/path', 'content': [] }) # when actual_dir_content = api._revision_directory_by( {'sha1_git': 'blah-id'}, 'some/path', '/api/1/revision/sha1/directory/') # then self.assertEquals(actual_dir_content, { 'type': 'dir', 'revision': 'rev-id', 'path': 'some/path', 'content': [] }) mock_service.lookup_directory_through_revision.assert_called_once_with( {'sha1_git': 'blah-id'}, 'some/path', limit=100, with_data=False) @patch('swh.web.ui.views.api.service') @istest def revision_directory_by_type_file(self, mock_service): # given mock_service.lookup_directory_through_revision.return_value = ( 'rev-id', { 'type': 'file', 'revision': 'rev-id', 'path': 'some/path', 'content': {'blah': 'blah'} }) # when actual_dir_content = api._revision_directory_by( {'sha1_git': 'sha1'}, 'some/path', '/api/1/revision/origin/2/directory/', limit=1000, with_data=True) # then self.assertEquals(actual_dir_content, { 'type': 'file', 'revision': 'rev-id', 'path': 'some/path', 'content': {'blah': 'blah'} }) mock_service.lookup_directory_through_revision.assert_called_once_with( {'sha1_git': 'sha1'}, 'some/path', limit=1000, with_data=True) @patch('swh.web.ui.views.api.utils') @patch('swh.web.ui.views.api._revision_directory_by') @istest def api_directory_through_revision_origin_ko_not_found(self, mock_rev_dir, mock_utils): mock_rev_dir.side_effect = NotFoundExc('not found') mock_utils.parse_timestamp.return_value = '2012-10-20 00:00:00' rv = self.app.get('/api/1/revision' '/origin/10' '/branch/refs/remote/origin/dev' '/ts/2012-10-20' '/directory/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, { 'error': 'not found'}) mock_rev_dir.assert_called_once_with( {'origin_id': 10, 'branch_name': 'refs/remote/origin/dev', 'ts': '2012-10-20 00:00:00'}, None, '/api/1/revision' '/origin/10' '/branch/refs/remote/origin/dev' '/ts/2012-10-20' '/directory/', with_data=False) @patch('swh.web.ui.views.api._revision_directory_by') @istest def api_directory_through_revision_origin(self, mock_revision_dir): expected_res = [{ 'id': '123' }] mock_revision_dir.return_value = expected_res rv = self.app.get('/api/1/revision/origin/3/directory/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_res) mock_revision_dir.assert_called_once_with({ 'origin_id': 3, 'branch_name': 'refs/heads/master', 'ts': None}, None, '/api/1/revision/origin/3/directory/', with_data=False) @patch('swh.web.ui.views.api.service') @istest def api_revision_log(self, mock_service): # given stub_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'], 'type': 'tar', 'synthetic': True, }] mock_service.lookup_revision_log.return_value = stub_revisions expected_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef' 'f7371d5/log/', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a' '42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [ '7834ef7e7c357ce2af928115c6c6a42b7e2a4345' ], 'parent_urls': [ '/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345' '/prev/18d8be353ed3480476f032475e7c233eff7371d5/' ], 'type': 'tar', 'synthetic': True, }] expected_response = { 'revisions': expected_revisions, 'next_revs_url': None } # when rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42' 'b7e2a44e6/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_response) mock_service.lookup_revision_log.assert_called_once_with( '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_with_next(self, mock_service): # given stub_revisions = [] for i in range(27): stub_revisions.append({'id': i}) mock_service.lookup_revision_log.return_value = stub_revisions[:26] expected_revisions = [x for x in stub_revisions if x['id'] < 25] for e in expected_revisions: e['url'] = '/api/1/revision/%s/' % e['id'] e['history_url'] = '/api/1/revision/%s/log/' % e['id'] expected_response = { 'revisions': expected_revisions, 'next_revs_url': '/api/1/revision/25/log/' } # when rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42' 'b7e2a44e6/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_response) mock_service.lookup_revision_log.assert_called_once_with( '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_not_found(self, mock_service): # given mock_service.lookup_revision_log.return_value = None # when rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42b7' 'e2a44e6/log/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Revision with sha1_git' ' 8834ef7e7c357ce2af928115c6c6a42b7e2a44e6 not found.'}) mock_service.lookup_revision_log.assert_called_once_with( '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_context(self, mock_service): # given stub_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'], 'type': 'tar', 'synthetic': True, }] mock_service.lookup_revision_log.return_value = stub_revisions mock_service.lookup_revision_multiple.return_value = [{ 'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory': '18d8be353ed3480476f032475e7c233eff7371d5', 'author_name': 'Name Surname', 'author_email': 'name@surname.com', 'committer_name': 'Name Surname', 'committer_email': 'name@surname.com', 'message': 'amazing revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'], 'type': 'tar', 'synthetic': True, }] expected_revisions = [ { 'url': '/api/1/revision/' '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/', 'history_url': '/api/1/revision/' '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/log/', 'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory_url': '/api/1/directory/' '18d8be353ed3480476f032475e7c233eff7371d5/', 'author_name': 'Name Surname', 'author_email': 'name@surname.com', 'committer_name': 'Name Surname', 'committer_email': 'name@surname.com', 'message': 'amazing revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'], 'parent_urls': [ '/api/1/revision/adc83b19e793491b1c6ea0fd8b46cd9f32e592fc' '/prev/7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/' ], 'type': 'tar', 'synthetic': True, }, { 'url': '/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/log/', 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/' '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'], 'parent_urls': [ '/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345' '/prev/18d8be353ed3480476f032475e7c233eff7371d5/' ], 'type': 'tar', 'synthetic': True, }] expected_response = { 'revisions': expected_revisions, 'next_revs_url': None } # when rv = self.app.get('/api/1/revision/18d8be353ed3480476f0' '32475e7c233eff7371d5/prev/prev-rev/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_response) mock_service.lookup_revision_log.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5', 26) mock_service.lookup_revision_multiple.assert_called_once_with( ['prev-rev']) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_by(self, mock_service): # given stub_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'], 'type': 'tar', 'synthetic': True, }] mock_service.lookup_revision_log_by.return_value = stub_revisions expected_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef' 'f7371d5/log/', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a' '42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [ '7834ef7e7c357ce2af928115c6c6a42b7e2a4345' ], 'parent_urls': [ '/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345' '/prev/18d8be353ed3480476f032475e7c233eff7371d5/' ], 'type': 'tar', 'synthetic': True, }] expected_result = { 'revisions': expected_revisions, 'next_revs_url': None } # when rv = self.app.get('/api/1/revision/origin/1/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_result) mock_service.lookup_revision_log_by.assert_called_once_with( 1, 'refs/heads/master', None, 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_by_with_next(self, mock_service): # given stub_revisions = [] for i in range(27): stub_revisions.append({'id': i}) mock_service.lookup_revision_log_by.return_value = stub_revisions[:26] expected_revisions = [x for x in stub_revisions if x['id'] < 25] for e in expected_revisions: e['url'] = '/api/1/revision/%s/' % e['id'] e['history_url'] = '/api/1/revision/%s/log/' % e['id'] expected_response = { 'revisions': expected_revisions, 'next_revs_url': '/api/1/revision/25/log/' } # when rv = self.app.get('/api/1/revision/origin/1/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_response) mock_service.lookup_revision_log_by.assert_called_once_with( 1, 'refs/heads/master', None, 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_by_norev(self, mock_service): # given mock_service.lookup_revision_log_by.side_effect = NotFoundExc( 'No revision') # when rv = self.app.get('/api/1/revision/origin/1/log/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, {'error': 'No revision'}) mock_service.lookup_revision_log_by.assert_called_once_with( 1, 'refs/heads/master', None, 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_history(self, mock_service): # for readability purposes, we use: # - sha1 as 3 letters (url are way too long otherwise to respect pep8) # - only keys with modification steps (all other keys are kept as is) # given stub_revision = { 'id': '883', 'children': ['777', '999'], 'parents': [], 'directory': '272' } mock_service.lookup_revision.return_value = stub_revision # then rv = self.app.get('/api/1/revision/883/prev/999/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'id': '883', 'url': '/api/1/revision/883/', 'history_url': '/api/1/revision/883/log/', 'history_context_url': '/api/1/revision/883/prev/999/log/', 'children': ['777', '999'], 'children_urls': ['/api/1/revision/777/', '/api/1/revision/999/'], 'parents': [], 'parent_urls': [], 'directory': '272', 'directory_url': '/api/1/directory/272/' }) mock_service.lookup_revision.assert_called_once_with('883') @patch('swh.web.ui.views.api._revision_directory_by') @istest def api_revision_directory_ko_not_found(self, mock_rev_dir): # given mock_rev_dir.side_effect = NotFoundExc('Not found') # then rv = self.app.get('/api/1/revision/999/directory/some/path/to/dir/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Not found'}) mock_rev_dir.assert_called_once_with( {'sha1_git': '999'}, 'some/path/to/dir', '/api/1/revision/999/directory/some/path/to/dir/', with_data=False) @patch('swh.web.ui.views.api._revision_directory_by') @istest def api_revision_directory_ok_returns_dir_entries(self, mock_rev_dir): stub_dir = { 'type': 'dir', 'revision': '999', 'content': [ { 'sha1_git': '789', 'type': 'file', 'target': '101', 'target_url': '/api/1/content/sha1_git:101/', 'name': 'somefile', 'file_url': '/api/1/revision/999/directory/some/path/' 'somefile/' }, { 'sha1_git': '123', 'type': 'dir', 'target': '456', 'target_url': '/api/1/directory/456/', 'name': 'to-subdir', 'dir_url': '/api/1/revision/999/directory/some/path/' 'to-subdir/', }] } # given mock_rev_dir.return_value = stub_dir # then rv = self.app.get('/api/1/revision/999/directory/some/path/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_dir) mock_rev_dir.assert_called_once_with( {'sha1_git': '999'}, 'some/path', '/api/1/revision/999/directory/some/path/', with_data=False) @patch('swh.web.ui.views.api._revision_directory_by') @istest def api_revision_directory_ok_returns_content(self, mock_rev_dir): stub_content = { 'type': 'file', 'revision': '999', 'content': { 'sha1_git': '789', 'sha1': '101', 'data_url': '/api/1/content/101/raw/', } } # given mock_rev_dir.return_value = stub_content # then url = '/api/1/revision/666/directory/some/other/path/' rv = self.app.get(url) self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_content) mock_rev_dir.assert_called_once_with( {'sha1_git': '666'}, 'some/other/path', url, with_data=False) @patch('swh.web.ui.views.api.service') @istest def api_person(self, mock_service): # given stub_person = { 'id': '198003', 'name': 'Software Heritage', 'email': 'robot@softwareheritage.org', } mock_service.lookup_person.return_value = stub_person # when rv = self.app.get('/api/1/person/198003/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_person) @patch('swh.web.ui.views.api.service') @istest def api_person_not_found(self, mock_service): # given mock_service.lookup_person.return_value = None # when rv = self.app.get('/api/1/person/666/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Person with id 666 not found.'}) @patch('swh.web.ui.views.api.service') @istest def api_directory(self, mock_service): # given stub_directories = [ { 'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5', 'type': 'file', 'target': '4568be353ed3480476f032475e7c233eff737123', }, { 'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737', 'type': 'dir', 'target': '8be353ed3480476f032475e7c233eff737123456', }] expected_directories = [ { 'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5', 'type': 'file', 'target': '4568be353ed3480476f032475e7c233eff737123', 'target_url': '/api/1/content/' 'sha1_git:4568be353ed3480476f032475e7c233eff737123/', }, { 'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737', 'type': 'dir', 'target': '8be353ed3480476f032475e7c233eff737123456', 'target_url': '/api/1/directory/8be353ed3480476f032475e7c233eff737123456/', }] mock_service.lookup_directory.return_value = stub_directories # when rv = self.app.get('/api/1/directory/' '18d8be353ed3480476f032475e7c233eff7371d5/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_directories) mock_service.lookup_directory.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.views.api.service') @istest def api_directory_not_found(self, mock_service): # given mock_service.lookup_directory.return_value = [] # when rv = self.app.get('/api/1/directory/' '66618d8be353ed3480476f032475e7c233eff737/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Directory with sha1_git ' '66618d8be353ed3480476f032475e7c233eff737 not found.'}) @patch('swh.web.ui.views.api.service') @istest def api_directory_with_path_found(self, mock_service): # given expected_dir = { 'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5', 'type': 'file', 'name': 'bla', 'target': '4568be353ed3480476f032475e7c233eff737123', 'target_url': '/api/1/content/' 'sha1_git:4568be353ed3480476f032475e7c233eff737123/', } mock_service.lookup_directory_with_path.return_value = expected_dir # when rv = self.app.get('/api/1/directory/' '18d8be353ed3480476f032475e7c233eff7371d5/bla/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_dir) mock_service.lookup_directory_with_path.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5', 'bla') @patch('swh.web.ui.views.api.service') @istest def api_directory_with_path_not_found(self, mock_service): # given mock_service.lookup_directory_with_path.return_value = None path = 'some/path/to/dir/' # when rv = self.app.get(('/api/1/directory/' '66618d8be353ed3480476f032475e7c233eff737/%s') % path) path = path.strip('/') # Path stripped of lead/trail separators # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': (('Entry with path %s relative to ' 'directory with sha1_git ' '66618d8be353ed3480476f032475e7c233eff737 not found.') % path)}) @patch('swh.web.ui.views.api.service') @istest def api_lookup_entity_by_uuid_not_found(self, mock_service): # when mock_service.lookup_entity_by_uuid.return_value = [] # when rv = self.app.get('/api/1/entity/' '5f4d4c51-498a-4e28-88b3-b3e4e8396cba/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': "Entity with uuid '5f4d4c51-498a-4e28-88b3-b3e4e8396cba' not " + "found."}) mock_service.lookup_entity_by_uuid.assert_called_once_with( '5f4d4c51-498a-4e28-88b3-b3e4e8396cba') @patch('swh.web.ui.views.api.service') @istest def api_lookup_entity_by_uuid_bad_request(self, mock_service): # when mock_service.lookup_entity_by_uuid.side_effect = BadInputExc( 'bad input: uuid malformed!') # when rv = self.app.get('/api/1/entity/uuid malformed/') self.assertEquals(rv.status_code, 400) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'bad input: uuid malformed!'}) mock_service.lookup_entity_by_uuid.assert_called_once_with( 'uuid malformed') @patch('swh.web.ui.views.api.service') @istest def api_lookup_entity_by_uuid(self, mock_service): # when stub_entities = [ { 'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4', 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2' }, { 'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2' } ] mock_service.lookup_entity_by_uuid.return_value = stub_entities expected_entities = [ { 'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4', 'uuid_url': '/api/1/entity/34bd6b1b-463f-43e5-a697-' '785107f598e4/', 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2', 'parent_url': '/api/1/entity/aee991a0-f8d7-4295-a201-' 'd1ce2efc9fb2/' }, { 'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2', 'uuid_url': '/api/1/entity/aee991a0-f8d7-4295-a201-' 'd1ce2efc9fb2/' } ] # when rv = self.app.get('/api/1/entity' '/34bd6b1b-463f-43e5-a697-785107f598e4/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_entities) mock_service.lookup_entity_by_uuid.assert_called_once_with( '34bd6b1b-463f-43e5-a697-785107f598e4') class ApiUtils(unittest.TestCase): @istest def api_lookup_not_found(self): # when with self.assertRaises(exc.NotFoundExc) as e: api._api_lookup('something', lambda x: None, 'this is the error message raised as it is None') self.assertEqual(e.exception.args[0], 'this is the error message raised as it is None') @istest def api_lookup_with_result(self): # when actual_result = api._api_lookup('something', lambda x: x + '!', 'this is the error which won\'t be ' 'used here') self.assertEqual(actual_result, 'something!') @istest def api_lookup_with_result_as_map(self): # when actual_result = api._api_lookup([1, 2, 3], lambda x: map(lambda y: y+1, x), 'this is the error which won\'t be ' 'used here') self.assertEqual(actual_result, [2, 3, 4]) diff --git a/swh/web/ui/utils.py b/swh/web/ui/utils.py index 5a07084a..c82a13f0 100644 --- a/swh/web/ui/utils.py +++ b/swh/web/ui/utils.py @@ -1,382 +1,400 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import flask import re from dateutil import parser +def next_page(function_name, q, page): + """Compute the next page for function_name with parameter q at page. + + """ + url = flask.url_for(function_name, q=q) + url = '%s?page=%s' % (url, page + 1) + return url + + +def prev_page(function_name, q, page): + """Compute the previous page for function_name with parameter q at page. + + """ + url = flask.url_for(function_name, q=q) + url = '%s?page=%s' % (url, page - 1) + return url + + def filter_endpoints(url_map, prefix_url_rule, blacklist=[]): """Filter endpoints by prefix url rule. Args: - url_map: Url Werkzeug.Map of rules - prefix_url_rule: prefix url string - blacklist: blacklist of some url Returns: Dictionary of url_rule with values methods and endpoint. The key is the url, the associated value is a dictionary of 'methods' (possible http methods) and 'endpoint' (python function) """ out = {} for r in url_map: rule = r['rule'] if rule == prefix_url_rule or rule in blacklist: continue if rule.startswith(prefix_url_rule): out[rule] = {'methods': sorted(map(str, r['methods'])), 'endpoint': r['endpoint']} return out def fmap(f, data): """Map f to data at each level. This must keep the origin data structure type: - map -> map - dict -> dict - list -> list - None -> None Args: f: function that expects one argument. data: data to traverse to apply the f function. list, map, dict or bare value. Returns: The same data-structure with modified values by the f function. """ if not data: return data if isinstance(data, map): return map(lambda y: fmap(f, y), (x for x in data)) if isinstance(data, list): return [fmap(f, x) for x in data] if isinstance(data, dict): return {k: fmap(f, v) for (k, v) in data.items()} return f(data) def prepare_data_for_view(data, encoding='utf-8'): def prepare_data(s): # Note: can only be 'data' key with bytes of raw content if isinstance(s, bytes): try: return s.decode(encoding) except: return "Cannot decode the data bytes, try and set another " \ "encoding in the url (e.g. ?encoding=utf8) or " \ "download directly the " \ "content's raw data." if isinstance(s, str): return re.sub(r'/api/1/', r'/browse/', s) return s return fmap(prepare_data, data) def filter_field_keys(data, field_keys): """Given an object instance (directory or list), and a csv field keys to filter on. Return the object instance with filtered keys. Note: Returns obj as is if it's an instance of types not in (dictionary, list) Args: - data: one object (dictionary, list...) to filter. - field_keys: csv or set of keys to filter the object on Returns: obj filtered on field_keys """ if isinstance(data, map): return (filter_field_keys(x, field_keys) for x in data) if isinstance(data, list): return [filter_field_keys(x, field_keys) for x in data] if isinstance(data, dict): return {k: v for (k, v) in data.items() if k in field_keys} return data def person_to_string(person): """Map a person (person, committer, tagger, etc...) to a string. """ return ''.join([person['name'], ' <', person['email'], '>']) def parse_timestamp(timestamp): """Given a time or timestamp (as string), parse the result as datetime. Returns: a timezone-aware datetime representing the parsed value. If the parsed value doesn't specify a timezone, UTC is assumed. Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - Today is January 1, 2047 at 8:21:00AM - 1452591542 """ default_timestamp = datetime.datetime.utcfromtimestamp(0).replace( tzinfo=datetime.timezone.utc) try: res = parser.parse(timestamp, ignoretz=False, fuzzy=True, default=default_timestamp) except: res = datetime.datetime.utcfromtimestamp(float(timestamp)).replace( tzinfo=datetime.timezone.utc) return res def enrich_object(object): """Enrich an object (revision, release) with link to the 'target' of type 'target_type'. Args: object: An object with target and target_type keys (e.g. release, revision) Returns: Object enriched with target_url pointing to the right swh.web.ui.api urls for the pointing object (revision, release, content, directory) """ obj = object.copy() if 'target' in obj and 'target_type' in obj: if obj['target_type'] == 'revision': obj['target_url'] = flask.url_for('api_revision', sha1_git=obj['target']) elif obj['target_type'] == 'release': obj['target_url'] = flask.url_for('api_release', sha1_git=obj['target']) elif obj['target_type'] == 'content': obj['target_url'] = flask.url_for( 'api_content_metadata', q='sha1_git:' + obj['target']) elif obj['target_type'] == 'directory': obj['target_url'] = flask.url_for('api_directory', q=obj['target']) return obj enrich_release = enrich_object def enrich_directory(directory, context_url=None): """Enrich directory with url to content or directory. """ if 'type' in directory: target_type = directory['type'] target = directory['target'] if target_type == 'file': directory['target_url'] = flask.url_for('api_content_metadata', q='sha1_git:%s' % target) if context_url: directory['file_url'] = context_url + directory['name'] + '/' else: directory['target_url'] = flask.url_for('api_directory', sha1_git=target) if context_url: directory['dir_url'] = context_url + directory['name'] + '/' return directory def enrich_metadata_endpoint(content): """Enrich metadata endpoint with link to the upper metadata endpoint. """ c = content.copy() c['content_url'] = flask.url_for('api_content_metadata', q='sha1:%s' % c['id']) return c def enrich_content(content): """Enrich content with links to: - data_url: its raw data - filetype_url: its filetype information """ for h in ['sha1', 'sha1_git', 'sha256']: if h in content: q = '%s:%s' % (h, content[h]) content['data_url'] = flask.url_for('api_content_raw', q=q) content['filetype_url'] = flask.url_for('api_content_filetype', q=q) content['language_url'] = flask.url_for('api_content_language', q=q) content['license_url'] = flask.url_for('api_content_license', q=q) break return content def enrich_entity(entity): """Enrich entity with """ if 'uuid' in entity: entity['uuid_url'] = flask.url_for('api_entity_by_uuid', uuid=entity['uuid']) if 'parent' in entity and entity['parent']: entity['parent_url'] = flask.url_for('api_entity_by_uuid', uuid=entity['parent']) return entity def _get_path_list(path_string): """Helper for enrich_revision: get a list of the sha1 id of the navigation breadcrumbs, ordered from the oldest to the most recent. Args: path_string: the path as a '/'-separated string Returns: The navigation context as a list of sha1 revision ids """ return path_string.split('/') def _get_revision_contexts(rev_id, context): """Helper for enrich_revision: retrieve for the revision id and potentially the navigation breadcrumbs the context to pass to parents and children of of the revision. Args: rev_id: the revision's sha1 id context: the current navigation context Returns: The context for parents, children and the url of the direct child as a tuple in that order. """ context_for_parents = None context_for_children = None url_direct_child = None if not context: return (rev_id, None, None) path_list = _get_path_list(context) context_for_parents = '%s/%s' % (context, rev_id) prev_for_children = path_list[:-1] if len(prev_for_children) > 0: context_for_children = '/'.join(prev_for_children) child_id = path_list[-1] # This commit is not the first commit in the path if context_for_children: url_direct_child = flask.url_for( 'api_revision', sha1_git=child_id, context=context_for_children) # This commit is the first commit in the path else: url_direct_child = flask.url_for( 'api_revision', sha1_git=child_id) return (context_for_parents, context_for_children, url_direct_child) def _make_child_url(rev_children, context): """Helper for enrich_revision: retrieve the list of urls corresponding to the children of the current revision according to the navigation breadcrumbs. Args: rev_children: a list of revision id context: the '/'-separated navigation breadcrumbs Returns: the list of the children urls according to the context """ children = [] for child in rev_children: if context and child != _get_path_list(context)[-1]: children.append(flask.url_for('api_revision', sha1_git=child)) elif not context: children.append(flask.url_for('api_revision', sha1_git=child)) return children def enrich_revision(revision, context=None): """Enrich revision with links where it makes sense (directory, parents). Keep track of the navigation breadcrumbs if they are specified. Args: revision: the revision as a dict context: the navigation breadcrumbs as a /-separated string of revision sha1_git """ ctx_parents, ctx_children, url_direct_child = _get_revision_contexts( revision['id'], context) revision['url'] = flask.url_for('api_revision', sha1_git=revision['id']) revision['history_url'] = flask.url_for('api_revision_log', sha1_git=revision['id']) if context: revision['history_context_url'] = flask.url_for( 'api_revision_log', sha1_git=revision['id'], prev_sha1s=context) if 'author' in revision: author = revision['author'] revision['author_url'] = flask.url_for('api_person', person_id=author['id']) if 'committer' in revision: committer = revision['committer'] revision['committer_url'] = flask.url_for('api_person', person_id=committer['id']) if 'directory' in revision: revision['directory_url'] = flask.url_for( 'api_directory', sha1_git=revision['directory']) if 'parents' in revision: parents = [] for parent in revision['parents']: parents.append(flask.url_for('api_revision', sha1_git=parent, context=ctx_parents)) revision['parent_urls'] = parents if 'children' in revision: children = _make_child_url(revision['children'], context) if url_direct_child: children.append(url_direct_child) revision['children_urls'] = children else: if url_direct_child: revision['children_urls'] = [url_direct_child] if 'message_decoding_failed' in revision: revision['message_url'] = flask.url_for( 'api_revision_raw_message', sha1_git=revision['id']) return revision diff --git a/swh/web/ui/views/api.py b/swh/web/ui/views/api.py index 5db09b2b..20d79e5b 100644 --- a/swh/web/ui/views/api.py +++ b/swh/web/ui/views/api.py @@ -1,950 +1,975 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from types import GeneratorType from flask import request, url_for from swh.web.ui import service, utils, apidoc as doc from swh.web.ui.exc import NotFoundExc from swh.web.ui.main import app @app.route('/api/1/stat/counters/') @doc.route('/api/1/stat/counters/', noargs=True) @doc.returns(rettype=doc.rettypes.dict, retdoc="A dictionary of SWH's most important statistics") def api_stats(): """Return statistics on SWH storage. """ return service.stat_counters() @app.route('/api/1/origin//visits/') @doc.route('/api/1/origin/visits/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc='The requested SWH origin identifier') @doc.returns(rettype=doc.rettypes.list, retdoc="""All instances of visits of the origin pointed by origin_id as POSIX time since epoch (if visit_id is not defined) """) def api_origin_visits(origin_id): """Return a list of origin visit (dict) for that particular origin including date (visit date as posix timestamp), target, target_type, status, ... """ def _enrich_origin_visit(origin_visit): ov = origin_visit.copy() ov['origin_visit_url'] = url_for('api_origin_visit', origin_id=ov['origin'], visit_id=ov['visit']) return ov return _api_lookup( origin_id, service.lookup_origin_visits, error_msg_if_not_found='No origin %s found' % origin_id, enrich_fn=_enrich_origin_visit) @app.route('/api/1/origin//visits//') @doc.route('/api/1/origin/visits/id/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc='The requested SWH origin identifier') @doc.arg('visit_id', default=1, argtype=doc.argtypes.int, argdoc='The requested SWH origin visit identifier') @doc.returns(rettype=doc.rettypes.list, retdoc="""The single instance visit visit_id of the origin pointed by origin_id as POSIX time since epoch""") def api_origin_visit(origin_id, visit_id): """Return origin visit (dict) for that particular origin including (but not limited to) date (visit date as posix timestamp), target, target_type, status, ... """ def _enrich_origin_visit(origin_visit): ov = origin_visit.copy() ov['origin_url'] = url_for('api_origin', origin_id=ov['origin']) if 'occurrences' in ov: ov['occurrences'] = { k: utils.enrich_object(v) for k, v in ov['occurrences'].items() } return ov return _api_lookup( origin_id, service.lookup_origin_visit, 'No visit %s for origin %s found' % (visit_id, origin_id), _enrich_origin_visit, visit_id) @app.route('/api/1/symbol/', methods=['POST']) @app.route('/api/1/symbol//') @doc.route('/api/1/symbol/search') @doc.arg('q', default='hello', argtype=doc.argtypes.str, argdoc="""An expression string to lookup in swh's raw content""") @doc.returns(rettype=doc.rettypes.list, retdoc="""A list of dict whose content matches the expression. Each dict has the following keys: - id (bytes): identifier of the content - name (text): symbol whose content match the expression - kind (text): kind of the symbol that matched - lang (text): Language for that entry - line (int): Number line for the symbol """) def api_fulltext_search(q=None): """Search a content per expression. """ + result = {} page = int(request.args.get('page', '1')) - return _api_lookup( + symbols = _api_lookup( q, lookup_fn=lambda exp, page=page: service.lookup_expression(exp, page), error_msg_if_not_found='No indexed raw content match expression \'' '%s\'.' % q, enrich_fn=utils.enrich_content) + if symbols: + if page > 1: + result.update({ + 'headers': { + 'link-next': utils.next_page('api_fulltext_search', + q, page), + 'link-prev': utils.prev_page('api_fulltext_search', + q, page), + } + }) + else: + result.update({ + 'headers': { + 'link-next': utils.next_page('api_fulltext_search', + q, page), + } + }) + + result.update({ + 'results': symbols + }) + + return result + @app.route('/api/1/content/search/', methods=['POST']) @app.route('/api/1/content/search//') @doc.route('/api/1/content/search/') @doc.arg('q', default='sha1:adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=doc.argtypes.algo_and_hash, argdoc="""An algo_hash:hash string, where algo_hash is one of sha1, sha1_git or sha256 and hash is the hash to search for in SWH""") @doc.raises(exc=doc.excs.badinput, doc='Raised if q is not well formed') @doc.returns(rettype=doc.rettypes.dict, retdoc="""A dict with keys: - search_res: a list of dicts corresponding to queried content with key 'found' to True if found, 'False' if not - search_stats: a dict containing number of files searched and percentage of files found """) def api_search(q=None): """Search a content per hash. This may take the form of: - a GET request with a single checksum - a POST request with many hashes, with the request body containing identifiers (typically filenames) as keys and corresponding hashes as values. """ response = {'search_res': None, 'search_stats': None} search_stats = {'nbfiles': 0, 'pct': 0} search_res = None # Single hash request route if q: r = service.search_hash(q) search_res = [{'filename': None, 'sha1': q, 'found': r['found']}] search_stats['nbfiles'] = 1 search_stats['pct'] = 100 if r['found'] else 0 # Post form submission with many hash requests elif request.method == 'POST': data = request.form queries = [] # Remove potential inputs with no associated value for k, v in data.items(): if v is not None: if k == 'q' and len(v) > 0: queries.append({'filename': None, 'sha1': v}) elif v != '': queries.append({'filename': k, 'sha1': v}) if len(queries) > 0: lookup = service.lookup_multiple_hashes(queries) result = [] for el in lookup: result.append({'filename': el['filename'], 'sha1': el['sha1'], 'found': el['found']}) search_res = result nbfound = len([x for x in lookup if x['found']]) search_stats['nbfiles'] = len(queries) search_stats['pct'] = (nbfound / len(queries))*100 response['search_res'] = search_res response['search_stats'] = search_stats return response def _api_lookup(criteria, lookup_fn, error_msg_if_not_found, enrich_fn=lambda x: x, *args): """Capture a redundant behavior of: - looking up the backend with a criteria (be it an identifier or checksum) passed to the function lookup_fn - if nothing is found, raise an NotFoundExc exception with error message error_msg_if_not_found. - Otherwise if something is returned: - either as list, map or generator, map the enrich_fn function to it and return the resulting data structure as list. - either as dict and pass to enrich_fn and return the dict enriched. Args: - criteria: discriminating criteria to lookup - lookup_fn: function expects one criteria and optional supplementary *args. - error_msg_if_not_found: if nothing matching the criteria is found, raise NotFoundExc with this error message. - enrich_fn: Function to use to enrich the result returned by lookup_fn. Default to the identity function if not provided. - *args: supplementary arguments to pass to lookup_fn. Raises: NotFoundExp or whatever `lookup_fn` raises. """ res = lookup_fn(criteria, *args) if not res: raise NotFoundExc(error_msg_if_not_found) if isinstance(res, (map, list, GeneratorType)): return [enrich_fn(x) for x in res] return enrich_fn(res) @app.route('/api/1/origin//') @app.route('/api/1/origin//url//') @doc.route('/api/1/origin/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The origin's SWH origin_id.") @doc.arg('origin_type', default='git', argtype=doc.argtypes.str, argdoc="The origin's type (git, svn..)") @doc.arg('origin_url', default='https://github.com/hylang/hy', argtype=doc.argtypes.path, argdoc="The origin's URL.") @doc.raises(exc=doc.excs.notfound, doc='Raised if origin_id does not correspond to an origin in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the origin identified by origin_id') def api_origin(origin_id=None, origin_type=None, origin_url=None): """Return information about the origin matching the passed criteria. Criteria may be: - An SWH-specific ID, if you already know it - An origin type and its URL, if you do not have the origin's SWH identifier """ ori_dict = { 'id': origin_id, 'type': origin_type, 'url': origin_url } ori_dict = {k: v for k, v in ori_dict.items() if ori_dict[k]} if 'id' in ori_dict: error_msg = 'Origin with id %s not found.' % ori_dict['id'] else: error_msg = 'Origin with type %s and URL %s not found' % ( ori_dict['type'], ori_dict['url']) def _enrich_origin(origin): if 'id' in origin: o = origin.copy() o['origin_visits_url'] = url_for('api_origin_visits', origin_id=o['id']) return o return origin return _api_lookup( ori_dict, lookup_fn=service.lookup_origin, error_msg_if_not_found=error_msg, enrich_fn=_enrich_origin) @app.route('/api/1/person//') @doc.route('/api/1/person/') @doc.arg('person_id', default=1, argtype=doc.argtypes.int, argdoc="The person's SWH identifier") @doc.raises(exc=doc.excs.notfound, doc='Raised if person_id does not correspond to an origin in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the person identified by person_id') def api_person(person_id): """Return information about person with identifier person_id. """ return _api_lookup( person_id, lookup_fn=service.lookup_person, error_msg_if_not_found='Person with id %s not found.' % person_id) @app.route('/api/1/release//') @doc.route('/api/1/release/') @doc.arg('sha1_git', default='8b137891791fe96927ad78e64b0aad7bded08bdc', argtype=doc.argtypes.sha1_git, argdoc="The release's sha1_git identifier") @doc.raises(exc=doc.excs.badinput, doc='Raised if the argument is not a sha1') @doc.raises(exc=doc.excs.notfound, doc='Raised if sha1_git does not correspond to a release in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the release identified by sha1_git') def api_release(sha1_git): """Return information about release with id sha1_git. """ error_msg = 'Release with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, lookup_fn=service.lookup_release, error_msg_if_not_found=error_msg, enrich_fn=utils.enrich_release) def _revision_directory_by(revision, path, request_path, limit=100, with_data=False): """Compute the revision matching criterion's directory or content data. Args: revision: dictionary of criterions representing a revision to lookup path: directory's path to lookup request_path: request path which holds the original context to limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of with_data: indicate to retrieve the content's raw data if path resolves to a content. """ def enrich_directory_local(dir, context_url=request_path): return utils.enrich_directory(dir, context_url) rev_id, result = service.lookup_directory_through_revision( revision, path, limit=limit, with_data=with_data) content = result['content'] if result['type'] == 'dir': # dir_entries result['content'] = list(map(enrich_directory_local, content)) else: # content result['content'] = utils.enrich_content(content) return result @app.route('/api/1/revision' '/origin/' '/directory/') @app.route('/api/1/revision' '/origin/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/directory//') @doc.route('/api/1/revision/origin/directory/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The revision's origin's SWH identifier") @doc.arg('branch_name', default='refs/heads/master', argtype=doc.argtypes.path, argdoc="""The optional branch for the given origin (default to master""") @doc.arg('ts', default='2000-01-17T11:23:54+00:00', argtype=doc.argtypes.ts, argdoc="""Optional timestamp (default to the nearest time crawl of timestamp)""") @doc.arg('path', default='.', argtype=doc.argtypes.path, argdoc='The path to the directory or file to display') @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching the passed criteria was not found""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the revision corresponding to the passed criteria""") def api_directory_through_revision_origin(origin_id, branch_name="refs/heads/master", ts=None, path=None, with_data=False): """Display directory or content information through a revision identified by origin/branch/timestamp. """ if ts: ts = utils.parse_timestamp(ts) return _revision_directory_by( { 'origin_id': origin_id, 'branch_name': branch_name, 'ts': ts }, path, request.path, with_data=with_data) @app.route('/api/1/revision' '/origin//') @app.route('/api/1/revision' '/origin/' '/branch//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts//') @app.route('/api/1/revision' '/origin/' '/ts//') @doc.route('/api/1/revision/origin/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The queried revision's origin identifier in SWH") @doc.arg('branch_name', default='refs/heads/master', argtype=doc.argtypes.path, argdoc="""The optional branch for the given origin (default to master)""") @doc.arg('ts', default='2000-01-17T11:23:54+00:00', argtype=doc.argtypes.ts, argdoc="The time at which the queried revision should be constrained") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching given criteria was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the revision identified by the given criteria""") def api_revision_with_origin(origin_id, branch_name="refs/heads/master", ts=None): """Display revision information through its identification by origin/branch/timestamp. """ if ts: ts = utils.parse_timestamp(ts) return _api_lookup( origin_id, service.lookup_revision_by, 'Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts), utils.enrich_revision, branch_name, ts) @app.route('/api/1/revision//') @app.route('/api/1/revision//prev//') @doc.route('/api/1/revision/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc="The revision's sha1_git identifier") @doc.arg('context', default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a', argtype=doc.argtypes.path, argdoc='The navigation breadcrumbs -- use at your own risk') @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a revision matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the revision identified by sha1_git') def api_revision(sha1_git, context=None): """Return information about revision with id sha1_git. """ def _enrich_revision(revision, context=context): return utils.enrich_revision(revision, context) return _api_lookup( sha1_git, service.lookup_revision, 'Revision with sha1_git %s not found.' % sha1_git, _enrich_revision) @app.route('/api/1/revision//raw/') @doc.route('/api/1/revision/raw/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc="The queried revision's sha1_git identifier") @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a revision matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.octet_stream, retdoc="""The message of the revision identified by sha1_git as a downloadable octet stream""") def api_revision_raw_message(sha1_git): """Return the raw data of the message of revision identified by sha1_git """ raw = service.lookup_revision_message(sha1_git) return app.response_class(raw['message'], headers={'Content-disposition': 'attachment;' 'filename=rev_%s_raw' % sha1_git}, mimetype='application/octet-stream') @app.route('/api/1/revision//directory/') @app.route('/api/1/revision//directory//') @doc.route('/api/1/revision/directory/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc="The revision's sha1_git identifier.") @doc.arg('dir_path', default='.', argtype=doc.argtypes.path, argdoc='The path from the top level directory') @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching sha1_git was not found in SWH , or if the path specified does not exist""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the directory pointed by revision id sha1-git and dir_path""") def api_revision_directory(sha1_git, dir_path=None, with_data=False): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). """ return _revision_directory_by( { 'sha1_git': sha1_git }, dir_path, request.path, with_data=with_data) @app.route('/api/1/revision//log/') @app.route('/api/1/revision//prev//log/') @doc.route('/api/1/revision/log/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc='The sha1_git of the revision queried') @doc.arg('prev_sha1s', default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a', argtype=doc.argtypes.path, argdoc='The navigation breadcrumbs -- use at your own risk!') @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git or prev_sha1s is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a revision matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc="""The log data starting at the revision identified by sha1_git, completed with the navigation breadcrumbs, if any""") def api_revision_log(sha1_git, prev_sha1s=None): """Show all revisions (~git log) starting from sha1_git. The first element returned is the given sha1_git, or the first breadcrumb, if any. """ limit = app.config['conf']['max_log_revs'] response = {'revisions': None, 'next_revs_url': None} revisions = None next_revs_url = None def lookup_revision_log_with_limit(s, limit=limit+1): return service.lookup_revision_log(s, limit) error_msg = 'Revision with sha1_git %s not found.' % sha1_git rev_get = _api_lookup(sha1_git, lookup_fn=lookup_revision_log_with_limit, error_msg_if_not_found=error_msg, enrich_fn=utils.enrich_revision) if len(rev_get) == limit+1: rev_backward = rev_get[:-1] next_revs_url = url_for('api_revision_log', sha1_git=rev_get[-1]['id']) else: rev_backward = rev_get if not prev_sha1s: # no nav breadcrumbs, so we're done revisions = rev_backward else: rev_forward_ids = prev_sha1s.split('/') rev_forward = _api_lookup(rev_forward_ids, lookup_fn=service.lookup_revision_multiple, error_msg_if_not_found=error_msg, enrich_fn=utils.enrich_revision) revisions = rev_forward + rev_backward response['revisions'] = revisions response['next_revs_url'] = next_revs_url return response @app.route('/api/1/revision' '/origin//log/') @app.route('/api/1/revision' '/origin/' '/branch//log/') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts//log/') @app.route('/api/1/revision' '/origin/' '/ts//log/') @doc.route('/api/1/revision/origin/log/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The revision's SWH origin identifier") @doc.arg('branch_name', default='refs/heads/master', argtype=doc.argtypes.path, argdoc="The revision's branch name within the origin specified") @doc.arg('ts', default='2000-01-17T11:23:54+00:00', argtype=doc.argtypes.ts, argdoc="""A time or timestamp string to parse""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching the given criteria was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the revision log starting at the revision matching the given criteria.""") def api_revision_log_by(origin_id, branch_name='refs/heads/master', ts=None): """Show all revisions (~git log) starting from the revision described by its origin_id, optional branch name and timestamp. The first element returned is the described revision. """ limit = app.config['conf']['max_log_revs'] response = {'revisions': None, 'next_revs_url': None} next_revs_url = None if ts: ts = utils.parse_timestamp(ts) def lookup_revision_log_by_with_limit(o_id, br, ts, limit=limit+1): return service.lookup_revision_log_by(o_id, br, ts, limit) error_msg = 'No revision matching origin %s ' % origin_id error_msg += ', branch name %s' % branch_name error_msg += (' and time stamp %s.' % ts) if ts else '.' rev_get = _api_lookup(origin_id, lookup_revision_log_by_with_limit, error_msg, utils.enrich_revision, branch_name, ts) if len(rev_get) == limit+1: revisions = rev_get[:-1] next_revs_url = url_for('api_revision_log', sha1_git=rev_get[-1]['id']) else: revisions = rev_get response['revisions'] = revisions response['next_revs_url'] = next_revs_url return response @app.route('/api/1/directory//') @app.route('/api/1/directory///') @doc.route('/api/1/directory/') @doc.arg('sha1_git', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=doc.argtypes.sha1_git, argdoc="The queried directory's corresponding sha1_git hash") @doc.arg('path', default='.', argtype=doc.argtypes.path, argdoc="A path relative to the queried directory's top level") @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a directory matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata and contents of the release identified by sha1_git""") def api_directory(sha1_git, path=None): """Return information about release with id sha1_git. """ if path: error_msg_path = ('Entry with path %s relative to directory ' 'with sha1_git %s not found.') % (path, sha1_git) return _api_lookup( sha1_git, service.lookup_directory_with_path, error_msg_path, utils.enrich_directory, path) else: error_msg_nopath = 'Directory with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, service.lookup_directory, error_msg_nopath, utils.enrich_directory) @app.route('/api/1/provenance//') @doc.route('/api/1/provenance/') @doc.arg('q', default='sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242', argtype=doc.argtypes.algo_and_hash, argdoc="""The queried content's corresponding hash (supported hash algorithms: sha1_git, sha1, sha256)""") @doc.raises(exc=doc.excs.badinput, doc="""Raised if hash algorithm is incorrect or if the hash value is badly formatted.""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a content matching the hash was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""List of provenance information (dict) for the matched content.""") def api_content_provenance(q): """Return content's provenance information if any. """ def _enrich_revision(provenance): p = provenance.copy() p['revision_url'] = url_for('api_revision', sha1_git=provenance['revision']) p['content_url'] = url_for('api_content_metadata', q='sha1_git:%s' % provenance['content']) p['origin_url'] = url_for('api_origin', origin_id=provenance['origin']) p['origin_visits_url'] = url_for('api_origin_visits', origin_id=provenance['origin']) p['origin_visit_url'] = url_for('api_origin_visit', origin_id=provenance['origin'], visit_id=provenance['visit']) return p return _api_lookup( q, lookup_fn=service.lookup_content_provenance, error_msg_if_not_found='Content with %s not found.' % q, enrich_fn=_enrich_revision) @app.route('/api/1/filetype//') @doc.route('/api/1/filetype/') @doc.arg('q', default='sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242', argtype=doc.argtypes.algo_and_hash, argdoc="""The queried content's corresponding hash (supported hash algorithms: sha1_git, sha1, sha256)""") @doc.raises(exc=doc.excs.badinput, doc="""Raised if hash algorithm is incorrect or if the hash value is badly formatted.""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a content matching the hash was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""Filetype information (dict) for the matched content.""") def api_content_filetype(q): """Return content's filetype information if any. """ return _api_lookup( q, lookup_fn=service.lookup_content_filetype, error_msg_if_not_found='No filetype information found ' 'for content %s.' % q, enrich_fn=utils.enrich_metadata_endpoint) @app.route('/api/1/language//') @doc.route('/api/1/language/') @doc.arg('q', default='sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242', argtype=doc.argtypes.algo_and_hash, argdoc="""The queried content's corresponding hash (supported hash algorithms: sha1_git, sha1, sha256)""") @doc.raises(exc=doc.excs.badinput, doc="""Raised if hash algorithm is incorrect or if the hash value is badly formatted.""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a content matching the hash was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""Language information (dict) for the matched content.""") def api_content_language(q): """Return content's language information if any. """ return _api_lookup( q, lookup_fn=service.lookup_content_language, error_msg_if_not_found='No language information found ' 'for content %s.' % q, enrich_fn=utils.enrich_metadata_endpoint) @app.route('/api/1/license//') @doc.route('/api/1/license/') @doc.arg('q', default='sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242', argtype=doc.argtypes.algo_and_hash, argdoc="""The queried content's corresponding hash (supported hash algorithms: sha1_git, sha1, sha256)""") @doc.raises(exc=doc.excs.badinput, doc="""Raised if hash algorithm is incorrect or if the hash value is badly formatted.""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a content matching the hash was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""License information (dict) for the matched content.""") def api_content_license(q): """Return content's license information if any. """ return _api_lookup( q, lookup_fn=service.lookup_content_license, error_msg_if_not_found='No license information found ' 'for content %s.' % q, enrich_fn=utils.enrich_metadata_endpoint) @app.route('/api/1/content//raw/') @doc.route('/api/1/content/raw/') @doc.arg('q', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=doc.argtypes.algo_and_hash, argdoc="""An algo_hash:hash string, where algo_hash is one of sha1, sha1_git or sha256 and hash is the hash to search for in SWH. Defaults to sha1 in the case of a missing algo_hash """) @doc.raises(exc=doc.excs.badinput, doc='Raised if q is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a content matching q was not found in SWH') @doc.returns(rettype=doc.rettypes.octet_stream, retdoc='The raw content data as an octet stream') def api_content_raw(q): """Return content's raw data if content is found. """ def generate(content): yield content['data'] content = service.lookup_content_raw(q) if not content: raise NotFoundExc('Content with %s not found.' % q) return app.response_class(generate(content), headers={'Content-disposition': 'attachment;' 'filename=content_%s_raw' % q}, mimetype='application/octet-stream') @app.route('/api/1/content//') @doc.route('/api/1/content/') @doc.arg('q', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=doc.argtypes.algo_and_hash, argdoc="""An algo_hash:hash string, where algo_hash is one of sha1, sha1_git or sha256 and hash is the hash to search for in SWH. Defaults to sha1 in the case of a missing algo_hash """) @doc.raises(exc=doc.excs.badinput, doc='Raised if q is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a content matching q was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the content identified by q. If content decoding was successful, it also returns the data""") def api_content_metadata(q): """Return content information if content is found. """ return _api_lookup( q, lookup_fn=service.lookup_content, error_msg_if_not_found='Content with %s not found.' % q, enrich_fn=utils.enrich_content) @app.route('/api/1/entity//') @doc.route('/api/1/entity/') @doc.arg('uuid', default='5f4d4c51-498a-4e28-88b3-b3e4e8396cba', argtype=doc.argtypes.uuid, argdoc="The entity's uuid identifier") @doc.raises(exc=doc.excs.badinput, doc='Raised if uuid is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if an entity matching uuid was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the entity identified by uuid') def api_entity_by_uuid(uuid): """Return content information if content is found. """ return _api_lookup( uuid, lookup_fn=service.lookup_entity_by_uuid, error_msg_if_not_found="Entity with uuid '%s' not found." % uuid, enrich_fn=utils.enrich_entity)